-
Notifications
You must be signed in to change notification settings - Fork 41
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add NATS publisher support to reminder #4829
base: main
Are you sure you want to change the base?
Add NATS publisher support to reminder #4829
Conversation
Signed-off-by: Vyom Yadav <[email protected]>
_, err = js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{ | ||
Name: c.cfg.Prefix, | ||
Subjects: []string{c.cfg.Prefix + ".>"}, | ||
}) | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Newer jetstream API, cleaner code, context support. https://github.com/nats-io/nats.go/blob/main/jetstream/README.md
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 if this works; I think when I started looking, this wasn't cleaned up / operable with CloudEvents yet.
Signed-off-by: Vyom Yadav <[email protected]>
3d399bf
to
748cb2c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a separate fix going in for the one test failure.
In general, I think you could have separated the changes to internal/events/nats
from the changes to reminder
, which makes it easier to roll back or hold one change or the other if there needs to be discussion. But I'm willing to approve this and then iterate more going forward, modulo the comments, particularly about re-using eventer.New
, rather than duplicating the setup code at this point.
@@ -153,7 +150,7 @@ func instantiateDriver( | |||
return eventersql.BuildPostgreSQLDriver(ctx, cfg) | |||
case constants.NATSDriver: | |||
zerolog.Ctx(ctx).Info().Msg("Using NATS driver") | |||
return nats.BuildNatsChannelDriver(cfg) | |||
return nats.BuildNatsChannelDriver(&cfg.Nats) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the other constructors take the entire cfg
, so that's the model I followed here.
func (r *reminder) getMessagePublisher(ctx context.Context) (message.Publisher, common.DriverCloser, error) { | ||
switch r.cfg.EventConfig.Driver { | ||
case constants.NATSDriver: | ||
return r.setupNATSPublisher(ctx) | ||
case constants.SQLDriver: | ||
return r.setupSQLPublisher(ctx) | ||
default: | ||
return nil, nil, fmt.Errorf("unknown publisher type: %s", r.cfg.EventConfig.Driver) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm nervous about having this diverge from internal/events/events.go
. Can we use an eventer.Interface
from eventer.New()
here?
(I need to do some refactoring in that package to move eventer/interface
into eventer
, but I'll wait until this code is landed to reduce the amount of conflicts.)
|
||
// NatsConfig is the configuration when using NATS as the event driver | ||
type NatsConfig struct { | ||
// URL is the URL for the NATS server | ||
URL string `mapstructure:"url" default:"nats://localhost:4222"` | ||
// Prefix is the prefix for the NATS subjects to subscribe to | ||
Prefix string `mapstructure:"prefix" default:"minder"` | ||
// Queue is the name of the queue group to join when consuming messages | ||
// queue groups allow multiple process to round-robin process messages. | ||
Queue string `mapstructure:"queue" default:"minder"` | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to move this up to common
(rather than just having a split between client
and server
configuration), let's put it in its own file. It feels kind of peculiar to have this in common
while having the rest of the event configuration still in server
, so I'd argue for moving the code back, particularly in the context of having smaller, more focused PRs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think of moving the reminder
configuration into the same package as server
, and calling them all "server-side components`? It feels like it would allow for more sharing between the two implementations.
_, err = js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{ | ||
Name: c.cfg.Prefix, | ||
Subjects: []string{c.cfg.Prefix + ".>"}, | ||
}) | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 if this works; I think when I started looking, this wasn't cleaned up / operable with CloudEvents yet.
Summary
Provide a brief overview of the changes and the issue being addressed.
Explain the rationale and any background necessary for understanding the changes.
List dependencies required by this change, if any.
Add NATS as an option for
reminder
to publish events.Change Type
Mark the type of change your PR introduces:
Testing
Outline how the changes were tested, including steps to reproduce and any relevant configurations.
Attach screenshots if helpful.
Tested locally, working fine (
nats stream view
output):Review Checklist: