Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust consumer creation to nats-server v2.9.0 #1080

Merged
merged 7 commits into from Sep 16, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 17 additions & 0 deletions js.go
Expand Up @@ -255,6 +255,9 @@ type jsOpts struct {
directGet bool
// For direct get next message
directNextFor string

// featureFlags are used to enable/disable specific JetStream features
featureFlags featureFlags
}

const (
Expand Down Expand Up @@ -294,6 +297,20 @@ func (opt jsOptFn) configureJSContext(opts *jsOpts) error {
return opt(opts)
}

type featureFlags struct {
useDurableConsumerCreate bool
}

// UseLegacyDurableConsumers makes JetStream use the legacy (pre nats-server v2.9.0) subjects for consumer creation.
// If this option is used when creating JetStremContext, $JS.API.CONSUMER.DURABLE.CREATE.<stream>.<consumer> will be used
// to create a consumer with Durable provided, rather than $JS.API.CONSUMER.CREATE.<stream>.<consumer>.
func UseLegacyDurableConsumers() JSOpt {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a good compromise to me, allows people to opt out of a feature should they be in a situation where server updates isnt possible.

Of course in other projects this is opt in -> opt out -> remove flag over 3 releases. But seems this is the best we can do at present.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main issue with this is that the default client behavior is still to use the new feature. So really they have to change code, so they could also hold upgrading the server or the client. If there's a flag, my guess is it should be opt-in, rather than opt-out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My preference would be an opt-in also.

To your point "they could also hold upgrading the server"....this is just not the case, our default answer to any support question is "upgrade the server", users might not be in a position to cange code etc, hence why I prefer opt-in.

Copy link
Member

@wallyqs wallyqs Sep 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the way these type of changes are rolled out is to wait for a few releases before making the default if there is no alternative, I like the feature flags approach from the PR, maybe we could have eventually an option for the JetStream context to always use latest features instead or to enable a subset of them.

nc.JetStream(nats.UseLatestJSFeatures()) // all opt-in
nc.JetStream(nats.UseLegacyDurableConsumers()) // opt-out 
nc.JetStream(&nats.JetStreamFeatures{LegacyDurableConsumers: true, AllowDirect: true})

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UseLatestJSFeatures() would be a nice addition in the future for sure. I would probably lean towards merging this PR as is (with having the opt-out option if needed, to be deprecated and removed in future releases), not to change the behavior vs other clients right now. If we decide we want to have future features like this opt-in, we need to be consistent across all clients. As some of the clients are already released without a feature flag, I don't think it's good to have this discrepancy.

return jsOptFn(func(opts *jsOpts) error {
opts.featureFlags.useDurableConsumerCreate = true
return nil
})
}

// ClientTrace can be used to trace API interactions for the JetStream Context.
type ClientTrace struct {
RequestSent func(subj string, payload []byte)
Expand Down
8 changes: 4 additions & 4 deletions jsm.go
Expand Up @@ -320,16 +320,16 @@ func (js *js) upsertConsumer(stream, consumerName string, cfg *ConsumerConfig, o
ccSubj = fmt.Sprintf(apiLegacyConsumerCreateT, stream)
} else if err := checkConsumerName(consumerName); err != nil {
return nil, err
} else if js.nc.serverMinVersion(2, 9, 0) {
} else if !js.nc.serverMinVersion(2, 9, 0) || (cfg.Durable != "" && js.opts.featureFlags.useDurableConsumerCreate) {
// if server version is lower than 2.9.0 or user set the useDurableConsumerCreate flag, use the legacy DURABLE.CREATE endpoint
ccSubj = fmt.Sprintf(apiDurableCreateT, stream, consumerName)
} else {
// if above server version 2.9.0, use the endpoints with consumer name
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not checking the server name, what I think should be done is just in checking if the new config Name is specified. If it is, then that signals the intent to use the new API endpoints. If user specifies Name, but connects to an older server, they will get a timeout: this is not ideal, but I don't think checking server version is reliable anyway (could be connected to one 2.9.0, but server accepting the request be older version, or vice versa).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I'll change it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, just to clarify - when user provides only Durable, we should use the old endpoints (CONSUMER.DURABLE.CREATE)?
If so, that makes it problematic for e.g. Subscribe("foo", nats.Durable("cons")), as basically it will always use old API subject, unless we add another SubOpt for just setting consumer name.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should then possibly set the Name to the Durable before calling js.AddConsumer(). But then, I agree that we have the situation where this is not a user choice and so connecting to older server would be a problem...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So from what I understand, we have following options:

  1. Not verify the server version compatibility and call the new API if Name is set - problem here is, Subscribe() will always call the old API when combined with Bind()/Durable() options
  2. Do as above, but set Name to the value of Durable in Subscribe() - that means however that we would always be using new API, even for older servers (that's not an option)
  3. Verify server version when choosing the right subject in upsertConsumer() - here, we might have an issue of connecting to an older server, as you described in the original comment.

Looking at those, I would still lean towards option 3 - I could strip version check in AddConsumer()/UpdateConsumer(), but leave the version check when choosing the right subject in upsertConsuper(). What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pure version orientated approach means users do not get a chance to say they need time to update infrastructure like ACLs and so forth to start using it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, not sure how big of an issue that would be vs forcing the user to intentionally start using the new API by changing their application code (IMO this transition should ideally be seamless, as the user of client library I don't need to care about the API subjects). What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a class of user who care deeply to lock down the APIs according to needs. Also a class of user who have spent considerable time in navigating the API subjects for cross account use via imports and exports.

A major design feature of these API subjects is to enable that lock down. Or to be selectively imported to manage permissions and restrictions.

The Venn diagrams of users likely to pay for NATS and those who care deeply for subject security probsbly has quite a lot of overlap.

So you might not care, but I think we should consider if introducing new features in the most user hostile way possible is perhaps not the right thing - especially considering the users most likely affected.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue with the ACL is possibly real, but in reality, the same logic can be applied the other way. If they deploy the new client but don't update the servers, they can update their ACLs and then deploy the servers.

The client possibly could have a way of rejecting the use of the new API (I do for test purposes). But at some point, the clients become too complex and too flexible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The question is how do users know? Do we have effective communication channels to tell people about these changes? Release notes and blog post are notoriously ineffective - especially as ours tend to be enormous.

So do we feel users are served well enough by the communication and warning we give them about upcoming changes?

In another world these would be considered breaking changes and tooling and just human behaviour is aware of major changes. These are not just new features. They majorly change existing code simply because it happens to point at another version server.

As much as I loathe the go major version change behaviour this does demonstrate the utility of that.

if cfg.FilterSubject == _EMPTY_ || cfg.FilterSubject == ">" {
ccSubj = fmt.Sprintf(apiConsumerCreateT, stream, consumerName)
} else {
ccSubj = fmt.Sprintf(apiConsumerCreateWithFilterSubjectT, stream, consumerName, cfg.FilterSubject)
}
} else {
// if consumer name is not empty and the server version is lower than 2.9.0, use the legacy DURABLE.CREATE endpoint
ccSubj = fmt.Sprintf(apiDurableCreateT, stream, consumerName)
}

resp, err := js.apiRequestWithContext(o.ctx, js.apiSubj(ccSubj), req)
Expand Down
30 changes: 28 additions & 2 deletions test/js_test.go
Expand Up @@ -1459,6 +1459,32 @@ func TestJetStreamManagement(t *testing.T) {
}
})

t.Run("legacy durable with jetstream context option", func(t *testing.T) {
jsLegacy, err := nc.JetStream(nats.UseLegacyDurableConsumers())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
sub, err := nc.SubscribeSync("$JS.API.CONSUMER.DURABLE.CREATE.foo.dlc-4")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer sub.Unsubscribe()
ci, err := jsLegacy.AddConsumer("foo", &nats.ConsumerConfig{Durable: "dlc-4", AckPolicy: nats.AckExplicitPolicy})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
msg, err := sub.NextMsg(1 * time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !strings.Contains(string(msg.Data), `"durable_name":"dlc-4"`) {
t.Fatalf("create consumer message is not correct: %q", string(msg.Data))
}
if ci == nil || ci.Config.Durable != "dlc-4" || ci.Stream != "foo" {
t.Fatalf("ConsumerInfo is not correct %+v", ci)
}
})

t.Run("with invalid consumer name", func(t *testing.T) {
if _, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "test.durable"}); err != nats.ErrInvalidConsumerName {
t.Fatalf("Expected: %v; got: %v", nats.ErrInvalidConsumerName, err)
Expand Down Expand Up @@ -1562,7 +1588,7 @@ func TestJetStreamManagement(t *testing.T) {
for info := range js.ConsumersInfo("foo") {
infos = append(infos, info)
}
if len(infos) != 5 || infos[0].Stream != "foo" {
if len(infos) != 6 || infos[0].Stream != "foo" {
t.Fatalf("ConsumerInfo is not correct %+v", infos)
}
})
Expand All @@ -1574,7 +1600,7 @@ func TestJetStreamManagement(t *testing.T) {
for name := range js.ConsumerNames("foo", nats.Context(ctx)) {
names = append(names, name)
}
if got, want := len(names), 5; got != want {
if got, want := len(names), 6; got != want {
t.Fatalf("Unexpected names, got=%d, want=%d", got, want)
}
})
Expand Down