Skip to content

Commit

Permalink
Add NATS publisher support to reminder
Browse files Browse the repository at this point in the history
Signed-off-by: Vyom Yadav <[email protected]>
  • Loading branch information
Vyom-Yadav committed Oct 27, 2024
1 parent 87ea9ed commit 748cb2c
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 35 deletions.
21 changes: 14 additions & 7 deletions config/reminder-config.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,17 @@ logging:
level: "debug"

events:
sql_connection:
dbhost: "watermill-postgres"
dbport: 5432
dbuser: postgres
dbpass: postgres
dbname: watermill
sslmode: disable
driver: "sql"
# driver: "cloudevents-nats"
sql:
connection:
dbhost: "watermill-postgres"
dbport: 5432
dbuser: postgres
dbpass: postgres
dbname: watermill
sslmode: disable
# nats:
# url: "nats://localhost:4222"
# prefix: "minder"
# queue: "minder"
2 changes: 1 addition & 1 deletion internal/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func instantiateDriver(
return eventersql.BuildPostgreSQLDriver(ctx, cfg)
case constants.NATSDriver:
zerolog.Ctx(ctx).Info().Msg("Using NATS driver")
return nats.BuildNatsChannelDriver(cfg)
return nats.BuildNatsChannelDriver(&cfg.Nats)
default:
zerolog.Ctx(ctx).Info().Msg("Driver unknown")
return nil, nil, nil, fmt.Errorf("unknown driver %s", driver)
Expand Down
12 changes: 8 additions & 4 deletions internal/events/nats/natschannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,24 @@ import (
"github.com/rs/zerolog"

"github.com/mindersec/minder/internal/events/common"
serverconfig "github.com/mindersec/minder/pkg/config/server"
"github.com/mindersec/minder/pkg/config"
)

// BuildNatsChannelDriver creates a new event driver using
// CloudEvents with the NATS-JetStream transport
func BuildNatsChannelDriver(cfg *serverconfig.EventConfig) (message.Publisher, message.Subscriber, common.DriverCloser, error) {
adapter := &cloudEventsNatsAdapter{cfg: &cfg.Nats}
func BuildNatsChannelDriver(cfg *config.NatsConfig) (message.Publisher, message.Subscriber, common.DriverCloser, error) {
if cfg == nil {
return nil, nil, nil, fmt.Errorf("NATS config is nil")
}

adapter := &cloudEventsNatsAdapter{cfg: cfg}
return adapter, adapter, func() {}, nil
}

// CloudEventsNatsPublisher actually consumes a _set_ of NATS topics,
// because CloudEvents-Jetstream has a separate Consumer for each topic
type cloudEventsNatsAdapter struct {
cfg *serverconfig.NatsConfig
cfg *config.NatsConfig
lock sync.Mutex
// Keep a cache of the topics we subscribe/publish to
topics map[string]topicState
Expand Down
5 changes: 3 additions & 2 deletions internal/events/nats/natschannel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
natsserver "github.com/nats-io/nats-server/v2/test"

"github.com/mindersec/minder/internal/events/common"
"github.com/mindersec/minder/pkg/config"
serverconfig "github.com/mindersec/minder/pkg/config/server"
)

Expand All @@ -27,7 +28,7 @@ func TestNatsChannel(t *testing.T) {
}
defer server.Shutdown()
cfg := serverconfig.EventConfig{
Nats: serverconfig.NatsConfig{
Nats: config.NatsConfig{
URL: server.ClientURL(),
Prefix: "test",
Queue: "minder",
Expand Down Expand Up @@ -120,7 +121,7 @@ loop:
}

func buildDriverPair(ctx context.Context, cfg serverconfig.EventConfig) (message.Publisher, message.Subscriber, common.DriverCloser, <-chan *message.Message, error) {
pub, sub, closer, err := BuildNatsChannelDriver(&cfg)
pub, sub, closer, err := BuildNatsChannelDriver(&cfg.Nats)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to build nats channel driver: %v", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,33 @@ import (
"github.com/rs/zerolog"

"github.com/mindersec/minder/internal/events/common"
natsinternal "github.com/mindersec/minder/internal/events/nats"
"github.com/mindersec/minder/pkg/eventer/constants"
)

func (r *reminder) getMessagePublisher(ctx context.Context) (message.Publisher, common.DriverCloser, error) {
switch r.cfg.EventConfig.Driver {
case constants.NATSDriver:
return r.setupNATSPublisher(ctx)
case constants.SQLDriver:
return r.setupSQLPublisher(ctx)
default:
return nil, nil, fmt.Errorf("unknown publisher type: %s", r.cfg.EventConfig.Driver)
}
}

func (r *reminder) setupNATSPublisher(_ context.Context) (message.Publisher, common.DriverCloser, error) {
pub, _, cl, err := natsinternal.BuildNatsChannelDriver(&r.cfg.EventConfig.NatsConfig)
if err != nil {
return nil, nil, fmt.Errorf("failed to create NATS publisher: %w", err)
}
return pub, cl, nil
}

func (r *reminder) setupSQLPublisher(ctx context.Context) (message.Publisher, common.DriverCloser, error) {
logger := zerolog.Ctx(ctx)

db, _, err := r.cfg.EventConfig.Connection.GetDBConnection(ctx)
db, _, err := r.cfg.EventConfig.SQLPubConfig.Connection.GetDBConnection(ctx)
if err != nil {
return nil, nil, fmt.Errorf("unable to connect to events database: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/reminder/reminder.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewReminder(ctx context.Context, store db.Store, config *reminderconfig.Con
logger := zerolog.Ctx(ctx)
logger.Info().Msgf("initial repository cursor: %s", r.repositoryCursor)

pub, cl, err := r.setupSQLPublisher(ctx)
pub, cl, err := r.getMessagePublisher(ctx)
if err != nil {
return nil, err
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/config/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,14 @@ func ReadKey(keypath string) ([]byte, error) {

return data, nil
}

// NatsConfig is the configuration when using NATS as the event driver
type NatsConfig struct {
// URL is the URL for the NATS server
URL string `mapstructure:"url" default:"nats://localhost:4222"`
// Prefix is the prefix for the NATS subjects to subscribe to
Prefix string `mapstructure:"prefix" default:"minder"`
// Queue is the name of the queue group to join when consuming messages
// queue groups allow multiple process to round-robin process messages.
Queue string `mapstructure:"queue" default:"minder"`
}
14 changes: 8 additions & 6 deletions pkg/config/reminder/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ func TestValidateConfig(t *testing.T) {
MinElapsed: parseTimeDuration(t, "1h"),
},
EventConfig: reminder.EventConfig{
Connection: config.DatabaseConfig{
Port: 8080,
SQLPubConfig: reminder.SQLPubConfig{
Connection: config.DatabaseConfig{
Port: 8080,
},
},
},
},
Expand Down Expand Up @@ -153,10 +155,10 @@ func TestSetViperDefaults(t *testing.T) {
require.Equal(t, parseTimeDuration(t, "1h"), parseTimeDuration(t, v.GetString("recurrence.interval")))
require.Equal(t, 100, v.GetInt("recurrence.batch_size"))
require.Equal(t, parseTimeDuration(t, "1h"), parseTimeDuration(t, v.GetString("recurrence.min_elapsed")))
require.Equal(t, "reminder", v.GetString("events.sql_connection.dbname"))
require.Equal(t, "reminder-event-postgres", v.GetString("events.sql_connection.dbhost"))
require.Equal(t, "reminder-event-postgres", v.GetString("events.sql_connection.dbhost"))
require.Equal(t, "postgres", v.GetString("events.sql_connection.dbuser"))
require.Equal(t, "reminder", v.GetString("events.sql.connection.dbname"))
require.Equal(t, "reminder-event-postgres", v.GetString("events.sql.connection.dbhost"))
require.Equal(t, "reminder-event-postgres", v.GetString("events.sql.connection.dbhost"))
require.Equal(t, "postgres", v.GetString("events.sql.connection.dbuser"))
}

// TestOverrideConfigByEnvVar tests that the configuration can be overridden by environment variables
Expand Down
9 changes: 8 additions & 1 deletion pkg/config/reminder/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,15 @@ import (

// EventConfig is the configuration for reminder's eventing system.
type EventConfig struct {
Driver string `mapstructure:"driver" default:"sql"`
SQLPubConfig SQLPubConfig `mapstructure:"sql"`
NatsConfig config.NatsConfig `mapstructure:"nats"`
}

// SQLPubConfig is the configuration for the SQL publisher
type SQLPubConfig struct {
// Connection is the configuration for the SQL event driver
//
// nolint: lll
Connection config.DatabaseConfig `mapstructure:"sql_connection" default:"{\"dbname\":\"reminder\",\"dbhost\":\"reminder-event-postgres\"}"`
Connection config.DatabaseConfig `mapstructure:"connection" default:"{\"dbname\":\"reminder\",\"dbhost\":\"reminder-event-postgres\"}"`
}
13 changes: 1 addition & 12 deletions pkg/config/server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type EventConfig struct {
// Aggregator is the configuration for the event aggregator middleware
Aggregator AggregatorConfig `mapstructure:"aggregator"`
// Nats is the configuration when using NATS as the event driver
Nats NatsConfig `mapstructure:"nats"`
Nats config.NatsConfig `mapstructure:"nats"`
}

// GoChannelEventConfig is the configuration for the go channel event driver
Expand Down Expand Up @@ -54,14 +54,3 @@ type AggregatorConfig struct {
// This is the threshold between rule evaluations + actions.
LockInterval int64 `mapstructure:"lock_interval" default:"30"`
}

// NatsConfig is the configuration when using NATS as the event driver
type NatsConfig struct {
// URL is the URL for the NATS server
URL string `mapstructure:"url" default:"nats://localhost:4222"`
// Prefix is the prefix for the NATS subjects to subscribe to
Prefix string `mapstructure:"prefix" default:"minder"`
// Queue is the name of the queue group to join when consuming messages
// queue groups allow multiple process to round-robin process messages.
Queue string `mapstructure:"queue" default:"minder"`
}

0 comments on commit 748cb2c

Please sign in to comment.