From 3c209f575fe6cadc4be86debd695c4d3b86b37a8 Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Fri, 16 Sep 2022 17:46:40 +0200 Subject: [PATCH] [UPDATED] Adjust consumer creation to nats-server v2.9.0 (#1080) --- js.go | 148 +++++++++++++++---------------- jserrors.go | 8 +- jsm.go | 45 +++++++--- test/js_test.go | 229 ++++++++++++++++++++++++++++++++++++++---------- 4 files changed, 289 insertions(+), 141 deletions(-) diff --git a/js.go b/js.go index e0022015c..22ad5f34d 100644 --- a/js.go +++ b/js.go @@ -122,9 +122,19 @@ const ( apiAccountInfo = "INFO" // apiConsumerCreateT is used to create consumers. - apiConsumerCreateT = "CONSUMER.CREATE.%s" + // it accepts stream name and consumer name. + apiConsumerCreateT = "CONSUMER.CREATE.%s.%s" + + // apiConsumerCreateT is used to create consumers. + // it accepts stream name, consumer name and filter subject + apiConsumerCreateWithFilterSubjectT = "CONSUMER.CREATE.%s.%s.%s" + + // apiLegacyConsumerCreateT is used to create consumers. + // this is a legacy endpoint to support creating ephemerals before nats-server v2.9.0. + apiLegacyConsumerCreateT = "CONSUMER.CREATE.%s" // apiDurableCreateT is used to create durable consumers. + // this is a legacy endpoint to support creating durable consumers before nats-server v2.9.0. apiDurableCreateT = "CONSUMER.DURABLE.CREATE.%s.%s" // apiConsumerInfoT is used to create consumers. @@ -245,6 +255,9 @@ type jsOpts struct { directGet bool // For direct get next message directNextFor string + + // featureFlags are used to enable/disable specific JetStream features + featureFlags featureFlags } const ( @@ -284,6 +297,20 @@ func (opt jsOptFn) configureJSContext(opts *jsOpts) error { return opt(opts) } +type featureFlags struct { + useDurableConsumerCreate bool +} + +// UseLegacyDurableConsumers makes JetStream use the legacy (pre nats-server v2.9.0) subjects for consumer creation. +// If this option is used when creating JetStremContext, $JS.API.CONSUMER.DURABLE.CREATE.. will be used +// to create a consumer with Durable provided, rather than $JS.API.CONSUMER.CREATE... +func UseLegacyDurableConsumers() JSOpt { + return jsOptFn(func(opts *jsOpts) error { + opts.featureFlags.useDurableConsumerCreate = true + return nil + }) +} + // ClientTrace can be used to trace API interactions for the JetStream Context. type ClientTrace struct { RequestSent func(subj string, payload []byte) @@ -1031,6 +1058,7 @@ func (d nakDelay) configureAck(opts *ackOpts) error { // ConsumerConfig is the configuration of a JetStream consumer. type ConsumerConfig struct { Durable string `json:"durable_name,omitempty"` + Name string `json:"name,omitempty"` Description string `json:"description,omitempty"` DeliverPolicy DeliverPolicy `json:"deliver_policy"` OptStartSeq uint64 `json:"opt_start_seq,omitempty"` @@ -1624,95 +1652,59 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, // If we are creating or updating let's process that request. if shouldCreate { - j, err := json.Marshal(ccreq) + info, err := js.upsertConsumer(stream, cfg.Durable, ccreq.Config) if err != nil { - cleanUpSub() - return nil, err - } - - var ccSubj string - if isDurable { - ccSubj = js.apiSubj(fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable)) - } else { - ccSubj = js.apiSubj(fmt.Sprintf(apiConsumerCreateT, stream)) - } - - if js.opts.shouldTrace { - ctrace := js.opts.ctrace - if ctrace.RequestSent != nil { - ctrace.RequestSent(ccSubj, j) - } - } - resp, err := nc.Request(ccSubj, j, js.opts.wait) - if err != nil { - cleanUpSub() - if err == ErrNoResponders { - err = ErrJetStreamNotEnabled + var apiErr *APIError + if ok := errors.As(err, &apiErr); !ok { + cleanUpSub() + return nil, err } - return nil, err - } - if js.opts.shouldTrace { - ctrace := js.opts.ctrace - if ctrace.ResponseReceived != nil { - ctrace.ResponseReceived(ccSubj, resp.Data, resp.Header) + if consumer == _EMPTY_ || + (apiErr.ErrorCode != JSErrCodeConsumerAlreadyExists && apiErr.ErrorCode != JSErrCodeConsumerNameExists) { + cleanUpSub() + if errors.Is(apiErr, ErrStreamNotFound) { + return nil, ErrStreamNotFound + } + return nil, err } - } - - var cinfo consumerResponse - err = json.Unmarshal(resp.Data, &cinfo) - if err != nil { - cleanUpSub() - return nil, err - } - info = cinfo.ConsumerInfo - - if cinfo.Error != nil { // We will not be using this sub here if we were push based. if !isPullMode { cleanUpSub() } - if consumer != _EMPTY_ && - (cinfo.Error.ErrorCode == JSErrCodeConsumerAlreadyExists || cinfo.Error.ErrorCode == JSErrCodeConsumerNameExists) { - info, err = js.ConsumerInfo(stream, consumer) - if err != nil { - return nil, err - } - deliver, err = processConsInfo(info, o.cfg, isPullMode, subj, queue) - if err != nil { - return nil, err - } + info, err = js.ConsumerInfo(stream, consumer) + if err != nil { + return nil, err + } + deliver, err = processConsInfo(info, o.cfg, isPullMode, subj, queue) + if err != nil { + return nil, err + } - if !isPullMode { - // We can't reuse the channel, so if one was passed, we need to create a new one. - if isSync { - ch = make(chan *Msg, cap(ch)) - } else if ch != nil { - // User provided (ChanSubscription), simply try to drain it. - for done := false; !done; { - select { - case <-ch: - default: - done = true - } + if !isPullMode { + // We can't reuse the channel, so if one was passed, we need to create a new one. + if isSync { + ch = make(chan *Msg, cap(ch)) + } else if ch != nil { + // User provided (ChanSubscription), simply try to drain it. + for done := false; !done; { + select { + case <-ch: + default: + done = true } } - jsi.deliver = deliver - jsi.hbi = info.Config.Heartbeat - - // Recreate the subscription here. - sub, err = nc.subscribe(jsi.deliver, queue, cb, ch, isSync, jsi) - if err != nil { - return nil, err - } - hasFC = info.Config.FlowControl - hasHeartbeats = info.Config.Heartbeat > 0 } - } else { - if errors.Is(cinfo.Error, ErrStreamNotFound) { - return nil, ErrStreamNotFound + jsi.deliver = deliver + jsi.hbi = info.Config.Heartbeat + + // Recreate the subscription here. + sub, err = nc.subscribe(jsi.deliver, queue, cb, ch, isSync, jsi) + if err != nil { + return nil, err } - return nil, cinfo.Error + hasFC = info.Config.FlowControl + hasHeartbeats = info.Config.Heartbeat > 0 } } else { // Since the library created the JS consumer, it will delete it on Unsubscribe()/Drain() @@ -1963,7 +1955,7 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { cfg.DeliverPolicy = DeliverByStartSequencePolicy cfg.OptStartSeq = sseq - ccSubj := fmt.Sprintf(apiConsumerCreateT, jsi.stream) + ccSubj := fmt.Sprintf(apiLegacyConsumerCreateT, jsi.stream) j, err := json.Marshal(jsi.ccreq) js := jsi.js sub.mu.Unlock() diff --git a/jserrors.go b/jserrors.go index 876278644..a97344c0c 100644 --- a/jserrors.go +++ b/jserrors.go @@ -30,7 +30,7 @@ var ( // ErrStreamNotFound is an error returned when stream with given name does not exist. ErrStreamNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamNotFound, Description: "stream not found", Code: 404}} - // ErrStreamNameAlreadyInUse is returned when a stream with given name already exists and has a different configuration + // ErrStreamNameAlreadyInUse is returned when a stream with given name already exists and has a different configuration. ErrStreamNameAlreadyInUse JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamNameInUse, Description: "stream name already in use", Code: 400}} // ErrConsumerNotFound is an error returned when consumer with given name does not exist. @@ -59,7 +59,7 @@ var ( // ErrStreamNameRequired is returned when the provided stream name is empty. ErrStreamNameRequired JetStreamError = &jsError{message: "stream name is required"} - // ErrConsumerNameRequired is returned when the provided consumer durable name is empty, + // ErrConsumerNameRequired is returned when the provided consumer durable name is empty. ErrConsumerNameRequired JetStreamError = &jsError{message: "consumer name is required"} // ErrConsumerConfigRequired is returned when empty consumer consuguration is supplied to add/update consumer. @@ -98,8 +98,8 @@ var ( // ErrCantAckIfConsumerAckNone is returned when attempting to ack a message for consumer with AckNone policy set. ErrCantAckIfConsumerAckNone JetStreamError = &jsError{message: "cannot acknowledge a message for a consumer with AckNone policy"} - // DEPRECATED: ErrInvalidDurableName is no longer returned and will be removed in future releases - // Use ErrInvalidConsumerName instead + // DEPRECATED: ErrInvalidDurableName is no longer returned and will be removed in future releases. + // Use ErrInvalidConsumerName instead. ErrInvalidDurableName = errors.New("nats: invalid durable name") ) diff --git a/jsm.go b/jsm.go index 078c004bc..0bdbca272 100644 --- a/jsm.go +++ b/jsm.go @@ -267,8 +267,15 @@ type consumerResponse struct { // AddConsumer will add a JetStream consumer. func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) { - if cfg != nil && cfg.Durable != _EMPTY_ { - consInfo, err := js.ConsumerInfo(stream, cfg.Durable) + if cfg == nil { + cfg = &ConsumerConfig{} + } + consumerName := cfg.Name + if consumerName == _EMPTY_ { + consumerName = cfg.Durable + } + if consumerName != _EMPTY_ { + consInfo, err := js.ConsumerInfo(stream, consumerName) if err != nil && !errors.Is(err, ErrConsumerNotFound) && !errors.Is(err, ErrStreamNotFound) { return nil, err } @@ -276,25 +283,29 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C if consInfo != nil { sameConfig := checkConfig(&consInfo.Config, cfg) if sameConfig != nil { - return nil, fmt.Errorf("%w: creating consumer %q on stream %q", ErrConsumerNameAlreadyInUse, cfg.Durable, stream) + return nil, fmt.Errorf("%w: creating consumer %q on stream %q", ErrConsumerNameAlreadyInUse, consumerName, stream) } } } - return js.upsertConsumer(stream, cfg, opts...) + return js.upsertConsumer(stream, consumerName, cfg, opts...) } func (js *js) UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) { if cfg == nil { return nil, ErrConsumerConfigRequired } - if cfg.Durable == _EMPTY_ { + consumerName := cfg.Name + if consumerName == _EMPTY_ { + consumerName = cfg.Durable + } + if consumerName == _EMPTY_ { return nil, ErrConsumerNameRequired } - return js.upsertConsumer(stream, cfg, opts...) + return js.upsertConsumer(stream, consumerName, cfg, opts...) } -func (js *js) upsertConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) { +func (js *js) upsertConsumer(stream, consumerName string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) { if err := checkStreamName(stream); err != nil { return nil, err } @@ -312,13 +323,21 @@ func (js *js) upsertConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) } var ccSubj string - if cfg != nil && cfg.Durable != _EMPTY_ { - if err := checkConsumerName(cfg.Durable); err != nil { - return nil, err - } - ccSubj = fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable) + if consumerName == _EMPTY_ { + // if consumer name is empty, use the legacy ephemeral endpoint + ccSubj = fmt.Sprintf(apiLegacyConsumerCreateT, stream) + } else if err := checkConsumerName(consumerName); err != nil { + return nil, err + } else if !js.nc.serverMinVersion(2, 9, 0) || (cfg.Durable != "" && js.opts.featureFlags.useDurableConsumerCreate) { + // if server version is lower than 2.9.0 or user set the useDurableConsumerCreate flag, use the legacy DURABLE.CREATE endpoint + ccSubj = fmt.Sprintf(apiDurableCreateT, stream, consumerName) } else { - ccSubj = fmt.Sprintf(apiConsumerCreateT, stream) + // if above server version 2.9.0, use the endpoints with consumer name + if cfg.FilterSubject == _EMPTY_ || cfg.FilterSubject == ">" { + ccSubj = fmt.Sprintf(apiConsumerCreateT, stream, consumerName) + } else { + ccSubj = fmt.Sprintf(apiConsumerCreateWithFilterSubjectT, stream, consumerName, cfg.FilterSubject) + } } resp, err := js.apiRequestWithContext(o.ctx, js.apiSubj(ccSubj), req) diff --git a/test/js_test.go b/test/js_test.go index ee450d396..e2a448e04 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -1264,7 +1264,7 @@ func TestJetStreamManagement(t *testing.T) { // Create the stream using our client API. var si *nats.StreamInfo t.Run("create stream", func(t *testing.T) { - si, err := js.AddStream(&nats.StreamConfig{Name: "foo"}) + si, err := js.AddStream(&nats.StreamConfig{Name: "foo", Subjects: []string{"foo", "bar"}}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1346,51 +1346,186 @@ func TestJetStreamManagement(t *testing.T) { }) t.Run("create consumer", func(t *testing.T) { - ci, err := js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy}) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if ci == nil || ci.Name != "dlc" || ci.Stream != "foo" { - t.Fatalf("ConsumerInfo is not correct %+v", ci) - } + t.Run("with durable set", func(t *testing.T) { + sub, err := nc.SubscribeSync("$JS.API.CONSUMER.CREATE.foo.dlc") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer sub.Unsubscribe() + ci, err := js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + msg, err := sub.NextMsg(1 * time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !strings.Contains(string(msg.Data), `"durable_name":"dlc"`) { + t.Fatalf("create consumer message is not correct: %q", string(msg.Data)) + } + if ci == nil || ci.Name != "dlc" || ci.Stream != "foo" { + t.Fatalf("ConsumerInfo is not correct %+v", ci) + } + }) + t.Run("with name set", func(t *testing.T) { + sub, err := nc.SubscribeSync("$JS.API.CONSUMER.CREATE.foo.dlc-1") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer sub.Unsubscribe() + ci, err := js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "dlc-1", AckPolicy: nats.AckExplicitPolicy}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + msg, err := sub.NextMsg(1 * time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !strings.Contains(string(msg.Data), `"durable_name":"dlc-1"`) { + t.Fatalf("create consumer message is not correct: %q", string(msg.Data)) + } + if ci == nil || ci.Name != "dlc-1" || ci.Stream != "foo" { + t.Fatalf("ConsumerInfo is not correct %+v", ci) + } + }) - if _, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "test.durable"}); err != nats.ErrInvalidConsumerName { - t.Fatalf("Expected: %v; got: %v", nats.ErrInvalidConsumerName, err) - } - }) + t.Run("with same Durable and Name set", func(t *testing.T) { + sub, err := nc.SubscribeSync("$JS.API.CONSUMER.CREATE.foo.dlc-2") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer sub.Unsubscribe() + ci, err := js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "dlc-2", Name: "dlc-2", AckPolicy: nats.AckExplicitPolicy}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + msg, err := sub.NextMsg(1 * time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !strings.Contains(string(msg.Data), `"durable_name":"dlc-2"`) { + t.Fatalf("create consumer message is not correct: %q", string(msg.Data)) + } + if ci == nil || ci.Name != "dlc-2" || ci.Stream != "foo" { + t.Fatalf("ConsumerInfo is not correct %+v", ci) + } + }) - t.Run("consumer with given name already exists", func(t *testing.T) { - // configs do not match - if _, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckAllPolicy}); !errors.Is(err, nats.ErrConsumerNameAlreadyInUse) { - t.Fatalf("Expected error: %v; got: %v", nats.ErrConsumerNameAlreadyInUse, err) - } + t.Run("with name and filter subject", func(t *testing.T) { + sub, err := nc.SubscribeSync("$JS.API.CONSUMER.CREATE.foo.dlc-3.foo") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer sub.Unsubscribe() + ci, err := js.AddConsumer("foo", &nats.ConsumerConfig{ + Durable: "dlc-3", + AckPolicy: nats.AckExplicitPolicy, + FilterSubject: "foo", + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + msg, err := sub.NextMsg(1 * time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !strings.Contains(string(msg.Data), `"durable_name":"dlc-3"`) { + t.Fatalf("create consumer message is not correct: %q", string(msg.Data)) + } + if ci == nil || ci.Name != "dlc-3" || ci.Stream != "foo" || ci.Config.FilterSubject != "foo" { + t.Fatalf("ConsumerInfo is not correct %+v", ci) + } + }) - // configs are the same - if _, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy}); err != nil { - t.Fatalf("Expected no error; got: %v", err) - } - }) + t.Run("legacy ephemeral consumer without name", func(t *testing.T) { + sub, err := nc.SubscribeSync("$JS.API.CONSUMER.CREATE.foo") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer sub.Unsubscribe() + ci, err := js.AddConsumer("foo", &nats.ConsumerConfig{AckPolicy: nats.AckExplicitPolicy}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + msg, err := sub.NextMsg(1 * time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !strings.Contains(string(msg.Data), `"stream_name":"foo"`) { + t.Fatalf("create consumer message is not correct: %q", string(msg.Data)) + } + if ci == nil || ci.Config.Durable != "" || ci.Stream != "foo" { + t.Fatalf("ConsumerInfo is not correct %+v", ci) + } + }) - t.Run("create consumer on missing stream", func(t *testing.T) { - _, err = js.AddConsumer("missing", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy}) - if err != nats.ErrStreamNotFound { - t.Fatalf("Expected stream not found error, got: %v", err) - } - }) + t.Run("legacy durable with jetstream context option", func(t *testing.T) { + jsLegacy, err := nc.JetStream(nats.UseLegacyDurableConsumers()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + sub, err := nc.SubscribeSync("$JS.API.CONSUMER.DURABLE.CREATE.foo.dlc-4") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer sub.Unsubscribe() + ci, err := jsLegacy.AddConsumer("foo", &nats.ConsumerConfig{Durable: "dlc-4", AckPolicy: nats.AckExplicitPolicy}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + msg, err := sub.NextMsg(1 * time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !strings.Contains(string(msg.Data), `"durable_name":"dlc-4"`) { + t.Fatalf("create consumer message is not correct: %q", string(msg.Data)) + } + if ci == nil || ci.Config.Durable != "dlc-4" || ci.Stream != "foo" { + t.Fatalf("ConsumerInfo is not correct %+v", ci) + } + }) - t.Run("create consumer check params", func(t *testing.T) { - _, err = js.AddConsumer("", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy}) - if err != nats.ErrStreamNameRequired { - t.Fatalf("Expected %v, got: %v", nats.ErrStreamNameRequired, err) - } - _, err = js.AddConsumer("bad.stream.name", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy}) - if err != nats.ErrInvalidStreamName { - t.Fatalf("Expected %v, got: %v", nats.ErrInvalidStreamName, err) - } - _, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "bad.consumer.name", AckPolicy: nats.AckExplicitPolicy}) - if err != nats.ErrInvalidConsumerName { - t.Fatalf("Expected %v, got: %v", nats.ErrInvalidConsumerName, err) - } + t.Run("with invalid consumer name", func(t *testing.T) { + if _, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "test.durable"}); err != nats.ErrInvalidConsumerName { + t.Fatalf("Expected: %v; got: %v", nats.ErrInvalidConsumerName, err) + } + }) + + t.Run("consumer with given name already exists, configs do not match", func(t *testing.T) { + // configs do not match + if _, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckAllPolicy}); !errors.Is(err, nats.ErrConsumerNameAlreadyInUse) { + t.Fatalf("Expected error: %v; got: %v", nats.ErrConsumerNameAlreadyInUse, err) + } + }) + + t.Run("consumer with given name already exists, configs are the same", func(t *testing.T) { + // configs are the same + if _, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy}); err != nil { + t.Fatalf("Expected no error; got: %v", err) + } + }) + + t.Run("stream does not exist", func(t *testing.T) { + _, err = js.AddConsumer("missing", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy}) + if err != nats.ErrStreamNotFound { + t.Fatalf("Expected stream not found error, got: %v", err) + } + }) + + t.Run("params validation error", func(t *testing.T) { + _, err = js.AddConsumer("", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy}) + if err != nats.ErrStreamNameRequired { + t.Fatalf("Expected %v, got: %v", nats.ErrStreamNameRequired, err) + } + _, err = js.AddConsumer("bad.stream.name", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy}) + if err != nats.ErrInvalidStreamName { + t.Fatalf("Expected %v, got: %v", nats.ErrInvalidStreamName, err) + } + _, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "bad.consumer.name", AckPolicy: nats.AckExplicitPolicy}) + if err != nats.ErrInvalidConsumerName { + t.Fatalf("Expected %v, got: %v", nats.ErrInvalidConsumerName, err) + } + }) }) t.Run("consumer info", func(t *testing.T) { @@ -1453,7 +1588,7 @@ func TestJetStreamManagement(t *testing.T) { for info := range js.Consumers("foo") { infos = append(infos, info) } - if len(infos) != 1 || infos[0].Stream != "foo" || infos[0].Config.Durable != "dlc" { + if len(infos) != 6 || infos[0].Stream != "foo" { t.Fatalf("ConsumerInfo is not correct %+v", infos) } }) @@ -1465,7 +1600,7 @@ func TestJetStreamManagement(t *testing.T) { for name := range js.ConsumerNames("foo", nats.Context(ctx)) { names = append(names, name) } - if got, want := len(names), 1; got != want { + if got, want := len(names), 6; got != want { t.Fatalf("Unexpected names, got=%d, want=%d", got, want) } }) @@ -1517,8 +1652,9 @@ func TestJetStreamManagement(t *testing.T) { if err != nats.ErrInvalidStreamName { t.Fatalf("Expected stream name required error, got %v", err) } - // Check that durable name is required + // Check that consumer name is required expected.Durable = "" + expected.Name = "" _, err = js.UpdateConsumer("foo", &expected) if err != nats.ErrConsumerNameRequired { t.Fatalf("Expected consumer name required error, got %v", err) @@ -1527,7 +1663,7 @@ func TestJetStreamManagement(t *testing.T) { expected.Durable = "bad.consumer.name" _, err = js.UpdateConsumer("foo", &expected) if err != nats.ErrInvalidConsumerName { - t.Fatalf("Expected consumer name required error, got %v", err) + t.Fatalf("Expected invalid consumer name error, got %v", err) } expected.Durable = "update_push_consumer" @@ -1542,6 +1678,7 @@ func TestJetStreamManagement(t *testing.T) { if err != nil { t.Fatalf("Error on update: %v", err) } + expected.Name = "update_push_consumer" if !reflect.DeepEqual(ci.Config, expected) { t.Fatalf("Expected config to be %+v, got %+v", expected, ci.Config) } @@ -7710,7 +7847,7 @@ func TestJetStreamConsumerConfigReplicasAndMemStorage(t *testing.T) { // We can't really check if the consumer ends-up with memory storage or not. // We are simply going to create a NATS subscription on the request subject // and make sure that the request contains "mem_storage:true". - sub, err := nc.SubscribeSync("$JS.API.CONSUMER.DURABLE.CREATE.CR.dur") + sub, err := nc.SubscribeSync("$JS.API.CONSUMER.CREATE.CR.dur") if err != nil { t.Fatalf("Error on subscribe: %v", err) }