Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust consumer creation to nats-server v2.9.0 #1080

Merged
merged 7 commits into from Sep 16, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
131 changes: 53 additions & 78 deletions js.go
Expand Up @@ -122,9 +122,19 @@ const (
apiAccountInfo = "INFO"

// apiConsumerCreateT is used to create consumers.
apiConsumerCreateT = "CONSUMER.CREATE.%s"
// it accepts stream name and consumer name.
apiConsumerCreateT = "CONSUMER.CREATE.%s.%s"

// apiConsumerCreateT is used to create consumers.
// it accepts stream name, consumer name and filter subject
apiConsumerCreateWithFilterSubjectT = "CONSUMER.CREATE.%s.%s.%s"

// apiLegacyConsumerCreateT is used to create consumers.
// this is a legacy endpoint to support creating ephemerals before nats-server v2.9.0.
apiLegacyConsumerCreateT = "CONSUMER.CREATE.%s"

// apiDurableCreateT is used to create durable consumers.
// this is a legacy endpoint to support creating durable consumers before nats-server v2.9.0.
apiDurableCreateT = "CONSUMER.DURABLE.CREATE.%s.%s"

// apiConsumerInfoT is used to create consumers.
Expand Down Expand Up @@ -1031,6 +1041,7 @@ func (d nakDelay) configureAck(opts *ackOpts) error {
// ConsumerConfig is the configuration of a JetStream consumer.
type ConsumerConfig struct {
Durable string `json:"durable_name,omitempty"`
Name string `json:"name,omitempty"`
Description string `json:"description,omitempty"`
DeliverPolicy DeliverPolicy `json:"deliver_policy"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
Expand Down Expand Up @@ -1621,95 +1632,59 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,

// If we are creating or updating let's process that request.
if shouldCreate {
j, err := json.Marshal(ccreq)
if err != nil {
cleanUpSub()
return nil, err
}

var ccSubj string
if isDurable {
ccSubj = js.apiSubj(fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable))
} else {
ccSubj = js.apiSubj(fmt.Sprintf(apiConsumerCreateT, stream))
}

if js.opts.shouldTrace {
ctrace := js.opts.ctrace
if ctrace.RequestSent != nil {
ctrace.RequestSent(ccSubj, j)
}
}
resp, err := nc.Request(ccSubj, j, js.opts.wait)
info, err := js.upsertConsumer(stream, cfg.Durable, ccreq.Config)
if err != nil {
cleanUpSub()
if err == ErrNoResponders {
err = ErrJetStreamNotEnabled
var apiErr *APIError
if ok := errors.As(err, &apiErr); !ok {
cleanUpSub()
return nil, err
}
return nil, err
}
if js.opts.shouldTrace {
ctrace := js.opts.ctrace
if ctrace.ResponseReceived != nil {
ctrace.ResponseReceived(ccSubj, resp.Data, resp.Header)
if consumer == _EMPTY_ ||
(apiErr.ErrorCode != JSErrCodeConsumerAlreadyExists && apiErr.ErrorCode != JSErrCodeConsumerNameExists) {
cleanUpSub()
if errors.Is(apiErr, ErrStreamNotFound) {
return nil, ErrStreamNotFound
}
return nil, err
}
}

var cinfo consumerResponse
err = json.Unmarshal(resp.Data, &cinfo)
if err != nil {
cleanUpSub()
return nil, err
}
info = cinfo.ConsumerInfo

if cinfo.Error != nil {
// We will not be using this sub here if we were push based.
if !isPullMode {
cleanUpSub()
}
if consumer != _EMPTY_ &&
(cinfo.Error.ErrorCode == JSErrCodeConsumerAlreadyExists || cinfo.Error.ErrorCode == JSErrCodeConsumerNameExists) {

info, err = js.ConsumerInfo(stream, consumer)
if err != nil {
return nil, err
}
deliver, err = processConsInfo(info, o.cfg, isPullMode, subj, queue)
if err != nil {
return nil, err
}
info, err = js.ConsumerInfo(stream, consumer)
if err != nil {
return nil, err
}
deliver, err = processConsInfo(info, o.cfg, isPullMode, subj, queue)
if err != nil {
return nil, err
}

if !isPullMode {
// We can't reuse the channel, so if one was passed, we need to create a new one.
if isSync {
ch = make(chan *Msg, cap(ch))
} else if ch != nil {
// User provided (ChanSubscription), simply try to drain it.
for done := false; !done; {
select {
case <-ch:
default:
done = true
}
if !isPullMode {
// We can't reuse the channel, so if one was passed, we need to create a new one.
if isSync {
ch = make(chan *Msg, cap(ch))
} else if ch != nil {
// User provided (ChanSubscription), simply try to drain it.
for done := false; !done; {
select {
case <-ch:
default:
done = true
}
}
jsi.deliver = deliver
jsi.hbi = info.Config.Heartbeat

// Recreate the subscription here.
sub, err = nc.subscribe(jsi.deliver, queue, cb, ch, isSync, jsi)
if err != nil {
return nil, err
}
hasFC = info.Config.FlowControl
hasHeartbeats = info.Config.Heartbeat > 0
}
} else {
if errors.Is(cinfo.Error, ErrStreamNotFound) {
return nil, ErrStreamNotFound
jsi.deliver = deliver
jsi.hbi = info.Config.Heartbeat

// Recreate the subscription here.
sub, err = nc.subscribe(jsi.deliver, queue, cb, ch, isSync, jsi)
if err != nil {
return nil, err
}
return nil, cinfo.Error
hasFC = info.Config.FlowControl
hasHeartbeats = info.Config.Heartbeat > 0
}
} else {
// Since the library created the JS consumer, it will delete it on Unsubscribe()/Drain()
Expand Down Expand Up @@ -1960,7 +1935,7 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
cfg.DeliverPolicy = DeliverByStartSequencePolicy
cfg.OptStartSeq = sseq

ccSubj := fmt.Sprintf(apiConsumerCreateT, jsi.stream)
ccSubj := fmt.Sprintf(apiLegacyConsumerCreateT, jsi.stream)
j, err := json.Marshal(jsi.ccreq)
js := jsi.js
sub.mu.Unlock()
Expand Down
8 changes: 4 additions & 4 deletions jserrors.go
Expand Up @@ -30,7 +30,7 @@ var (
// ErrStreamNotFound is an error returned when stream with given name does not exist.
ErrStreamNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamNotFound, Description: "stream not found", Code: 404}}

// ErrStreamNameAlreadyInUse is returned when a stream with given name already exists and has a different configuration
// ErrStreamNameAlreadyInUse is returned when a stream with given name already exists and has a different configuration.
ErrStreamNameAlreadyInUse JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamNameInUse, Description: "stream name already in use", Code: 400}}

// ErrConsumerNotFound is an error returned when consumer with given name does not exist.
Expand Down Expand Up @@ -59,7 +59,7 @@ var (
// ErrStreamNameRequired is returned when the provided stream name is empty.
ErrStreamNameRequired JetStreamError = &jsError{message: "stream name is required"}

// ErrConsumerNameRequired is returned when the provided consumer durable name is empty,
// ErrConsumerNameRequired is returned when the provided consumer durable name is empty.
ErrConsumerNameRequired JetStreamError = &jsError{message: "consumer name is required"}

// ErrConsumerConfigRequired is returned when empty consumer consuguration is supplied to add/update consumer.
Expand Down Expand Up @@ -98,8 +98,8 @@ var (
// ErrCantAckIfConsumerAckNone is returned when attempting to ack a message for consumer with AckNone policy set.
ErrCantAckIfConsumerAckNone JetStreamError = &jsError{message: "cannot acknowledge a message for a consumer with AckNone policy"}

// DEPRECATED: ErrInvalidDurableName is no longer returned and will be removed in future releases
// Use ErrInvalidConsumerName instead
// DEPRECATED: ErrInvalidDurableName is no longer returned and will be removed in future releases.
// Use ErrInvalidConsumerName instead.
ErrInvalidDurableName = errors.New("nats: invalid durable name")
)

Expand Down
43 changes: 31 additions & 12 deletions jsm.go
Expand Up @@ -259,34 +259,45 @@ type consumerResponse struct {

// AddConsumer will add a JetStream consumer.
func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
if cfg != nil && cfg.Durable != _EMPTY_ {
consInfo, err := js.ConsumerInfo(stream, cfg.Durable)
if cfg == nil {
cfg = &ConsumerConfig{}
}
consumerName := cfg.Name
if consumerName == _EMPTY_ {
consumerName = cfg.Durable
}
if consumerName != _EMPTY_ {
consInfo, err := js.ConsumerInfo(stream, consumerName)
if err != nil && !errors.Is(err, ErrConsumerNotFound) && !errors.Is(err, ErrStreamNotFound) {
return nil, err
}

if consInfo != nil {
sameConfig := checkConfig(&consInfo.Config, cfg)
if sameConfig != nil {
return nil, fmt.Errorf("%w: creating consumer %q on stream %q", ErrConsumerNameAlreadyInUse, cfg.Durable, stream)
return nil, fmt.Errorf("%w: creating consumer %q on stream %q", ErrConsumerNameAlreadyInUse, consumerName, stream)
}
}
}

return js.upsertConsumer(stream, cfg, opts...)
return js.upsertConsumer(stream, consumerName, cfg, opts...)
}

func (js *js) UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
if cfg == nil {
return nil, ErrConsumerConfigRequired
}
if cfg.Durable == _EMPTY_ {
consumerName := cfg.Name
if consumerName == _EMPTY_ {
consumerName = cfg.Durable
}
if consumerName == _EMPTY_ {
return nil, ErrConsumerNameRequired
}
return js.upsertConsumer(stream, cfg, opts...)
return js.upsertConsumer(stream, consumerName, cfg, opts...)
}

func (js *js) upsertConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
func (js *js) upsertConsumer(stream, consumerName string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
if err := checkStreamName(stream); err != nil {
return nil, err
}
Expand All @@ -304,13 +315,21 @@ func (js *js) upsertConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt)
}

var ccSubj string
if cfg != nil && cfg.Durable != _EMPTY_ {
if err := checkConsumerName(cfg.Durable); err != nil {
return nil, err
if consumerName == _EMPTY_ {
// if consumer name is empty, use the legacy ephemeral endpoint
ccSubj = fmt.Sprintf(apiLegacyConsumerCreateT, stream)
} else if err := checkConsumerName(consumerName); err != nil {
return nil, err
} else if js.nc.serverMinVersion(2, 9, 0) {
// if above server version 2.9.0, use the endpoints with consumer name
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not checking the server name, what I think should be done is just in checking if the new config Name is specified. If it is, then that signals the intent to use the new API endpoints. If user specifies Name, but connects to an older server, they will get a timeout: this is not ideal, but I don't think checking server version is reliable anyway (could be connected to one 2.9.0, but server accepting the request be older version, or vice versa).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I'll change it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, just to clarify - when user provides only Durable, we should use the old endpoints (CONSUMER.DURABLE.CREATE)?
If so, that makes it problematic for e.g. Subscribe("foo", nats.Durable("cons")), as basically it will always use old API subject, unless we add another SubOpt for just setting consumer name.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should then possibly set the Name to the Durable before calling js.AddConsumer(). But then, I agree that we have the situation where this is not a user choice and so connecting to older server would be a problem...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So from what I understand, we have following options:

  1. Not verify the server version compatibility and call the new API if Name is set - problem here is, Subscribe() will always call the old API when combined with Bind()/Durable() options
  2. Do as above, but set Name to the value of Durable in Subscribe() - that means however that we would always be using new API, even for older servers (that's not an option)
  3. Verify server version when choosing the right subject in upsertConsumer() - here, we might have an issue of connecting to an older server, as you described in the original comment.

Looking at those, I would still lean towards option 3 - I could strip version check in AddConsumer()/UpdateConsumer(), but leave the version check when choosing the right subject in upsertConsuper(). What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pure version orientated approach means users do not get a chance to say they need time to update infrastructure like ACLs and so forth to start using it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, not sure how big of an issue that would be vs forcing the user to intentionally start using the new API by changing their application code (IMO this transition should ideally be seamless, as the user of client library I don't need to care about the API subjects). What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a class of user who care deeply to lock down the APIs according to needs. Also a class of user who have spent considerable time in navigating the API subjects for cross account use via imports and exports.

A major design feature of these API subjects is to enable that lock down. Or to be selectively imported to manage permissions and restrictions.

The Venn diagrams of users likely to pay for NATS and those who care deeply for subject security probsbly has quite a lot of overlap.

So you might not care, but I think we should consider if introducing new features in the most user hostile way possible is perhaps not the right thing - especially considering the users most likely affected.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue with the ACL is possibly real, but in reality, the same logic can be applied the other way. If they deploy the new client but don't update the servers, they can update their ACLs and then deploy the servers.

The client possibly could have a way of rejecting the use of the new API (I do for test purposes). But at some point, the clients become too complex and too flexible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The question is how do users know? Do we have effective communication channels to tell people about these changes? Release notes and blog post are notoriously ineffective - especially as ours tend to be enormous.

So do we feel users are served well enough by the communication and warning we give them about upcoming changes?

In another world these would be considered breaking changes and tooling and just human behaviour is aware of major changes. These are not just new features. They majorly change existing code simply because it happens to point at another version server.

As much as I loathe the go major version change behaviour this does demonstrate the utility of that.

if cfg.FilterSubject == _EMPTY_ || cfg.FilterSubject == ">" {
ccSubj = fmt.Sprintf(apiConsumerCreateT, stream, consumerName)
} else {
ccSubj = fmt.Sprintf(apiConsumerCreateWithFilterSubjectT, stream, consumerName, cfg.FilterSubject)
}
ccSubj = fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable)
} else {
ccSubj = fmt.Sprintf(apiConsumerCreateT, stream)
// if consumer name is not empty and the server version is lower than 2.9.0, use the legacy DURABLE.CREATE endpoint
ccSubj = fmt.Sprintf(apiDurableCreateT, stream, consumerName)
}

resp, err := js.apiRequestWithContext(o.ctx, js.apiSubj(ccSubj), req)
Expand Down