From 4d7a668a9563d7fdfca55ae2c3f59096dce988da Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Thu, 11 Aug 2022 17:58:52 +0200 Subject: [PATCH 1/2] Use JetStream error codes, extract ErrConsumerNameAlredyInUse --- js.go | 40 +++++++--------- jsm.go | 122 +++++++++++++++++++++++++++++++----------------- nats.go | 21 ++++----- test/js_test.go | 35 +++++++++----- 4 files changed, 131 insertions(+), 87 deletions(-) diff --git a/js.go b/js.go index 77265854b..c52b44a55 100644 --- a/js.go +++ b/js.go @@ -524,7 +524,7 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) { return nil, ErrInvalidJSAck } if pa.Error != nil { - return nil, fmt.Errorf("nats: %s", pa.Error.Description) + return nil, pa.Error } if pa.PubAck == nil || pa.PubAck.Stream == _EMPTY_ { return nil, ErrInvalidJSAck @@ -728,7 +728,7 @@ func (js *js) handleAsyncReply(m *Msg) { return } if pa.Error != nil { - doErr(fmt.Errorf("nats: %s", pa.Error.Description)) + doErr(pa.Error) return } if pa.PubAck == nil || pa.PubAck.Stream == _EMPTY_ { @@ -1228,7 +1228,10 @@ func (js *js) ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOp // See important note in Subscribe() func (js *js) PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) { mch := make(chan *Msg, js.nc.Opts.SubChanLen) - return js.subscribe(subj, _EMPTY_, nil, mch, true, true, append(opts, Durable(durable))) + if durable != "" { + opts = append(opts, Durable(durable)) + } + return js.subscribe(subj, _EMPTY_, nil, mch, true, true, opts) } func processConsInfo(info *ConsumerInfo, userCfg *ConsumerConfig, isPullMode bool, subj, queue string) (string, error) { @@ -1394,7 +1397,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, // If this is a queue subscription and no consumer nor durable name was specified, // then we will use the queue name as a durable name. if o.consumer == _EMPTY_ && o.cfg.Durable == _EMPTY_ { - if err := checkDurName(queue); err != nil { + if err := checkConsumerName(queue); err != nil { return nil, err } o.cfg.Durable = queue @@ -1651,8 +1654,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, cleanUpSub() } if consumer != _EMPTY_ && - (strings.Contains(cinfo.Error.Description, `consumer already exists`) || - strings.Contains(cinfo.Error.Description, `consumer name already in use`)) { + (cinfo.Error.ErrorCode == ConsumerAlreadyExists || cinfo.Error.ErrorCode == ConsumerNameExists) { info, err = js.ConsumerInfo(stream, consumer) if err != nil { @@ -1689,10 +1691,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, hasHeartbeats = info.Config.Heartbeat > 0 } } else { - if cinfo.Error.Code == 404 { + if cinfo.Error.ErrorCode == StreamNotFound { return nil, ErrStreamNotFound } - return nil, fmt.Errorf("nats: %s", cinfo.Error.Description) + return nil, cinfo.Error } } else { // Since the library created the JS consumer, it will delete it on Unsubscribe()/Drain() @@ -1970,7 +1972,7 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { } if cinfo.Error != nil { - pushErr(fmt.Errorf("nats: %s", cinfo.Error.Description)) + pushErr(cinfo.Error) return } @@ -2185,17 +2187,8 @@ func Description(description string) SubOpt { }) } -// Check that the durable name is valid, that is, that it does not contain -// any ".", and if it does return ErrInvalidDurableName, otherwise nil. -func checkDurName(dur string) error { - if strings.Contains(dur, ".") { - return ErrInvalidDurableName - } - return nil -} - // Durable defines the consumer name for JetStream durable subscribers. -// This function will return ErrInvalidDurableName in the name contains +// This function will return ErrInvalidConsumerName in the name contains // any dot ".". func Durable(consumer string) SubOpt { return subOptFn(func(opts *subOpts) error { @@ -2205,7 +2198,7 @@ func Durable(consumer string) SubOpt { if opts.consumer != _EMPTY_ && opts.consumer != consumer { return fmt.Errorf("nats: duplicate consumer names (%s and %s)", opts.consumer, consumer) } - if err := checkDurName(consumer); err != nil { + if err := checkConsumerName(consumer); err != nil { return err } @@ -2757,10 +2750,13 @@ func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer strin return nil, err } if info.Error != nil { - if info.Error.Code == 404 { + if info.Error.ErrorCode == ConsumerNotFound { return nil, ErrConsumerNotFound } - return nil, fmt.Errorf("nats: %s", info.Error.Description) + if info.Error.ErrorCode == StreamNotFound { + return nil, ErrStreamNotFound + } + return nil, info.Error } return info.ConsumerInfo, nil } diff --git a/jsm.go b/jsm.go index b2cfe6e17..fa7457cf7 100644 --- a/jsm.go +++ b/jsm.go @@ -156,9 +156,9 @@ type ExternalStream struct { // APIError is included in all API responses if there was an error. type APIError struct { - Code int `json:"code"` - ErrorCode int `json:"err_code"` - Description string `json:"description,omitempty"` + Code int `json:"code"` + ErrorCode ErrorCode `json:"err_code"` + Description string `json:"description,omitempty"` } // apiResponse is a standard response from the JetStream JSON API @@ -219,6 +219,26 @@ type accountInfoResponse struct { AccountInfo } +type ErrorCode uint16 + +const ( + JetStreamNotEnabledForAccount ErrorCode = 10039 + + StreamNotFound ErrorCode = 10059 + StreamNameInUse ErrorCode = 10058 + + ConsumerNotFound ErrorCode = 10014 + ConsumerNameExists ErrorCode = 10013 + ConsumerAlreadyExists ErrorCode = 10105 + + MessageNotFound ErrorCode = 10037 +) + +// Error prints the JetStream API error code and description +func (e *APIError) Error() string { + return fmt.Sprintf("nats: API error %d: %s", e.ErrorCode, e.Description) +} + // AccountInfo retrieves info about the JetStream usage from the current account. // If JetStream is not enabled, this will return ErrJetStreamNotEnabled // Other errors can happen but are generally considered retryable @@ -244,13 +264,10 @@ func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) { return nil, err } if info.Error != nil { - var err error - if strings.Contains(info.Error.Description, "not enabled for") { - err = ErrJetStreamNotEnabled - } else { - err = errors.New(info.Error.Description) + if info.Error.ErrorCode == JetStreamNotEnabledForAccount { + return nil, ErrJetStreamNotEnabled } - return nil, err + return nil, info.Error } return &info.AccountInfo, nil @@ -268,6 +285,30 @@ type consumerResponse struct { // AddConsumer will add a JetStream consumer. func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) { + if cfg != nil && cfg.Durable != _EMPTY_ { + consInfo, err := js.ConsumerInfo(stream, cfg.Durable) + if err != nil && !errors.Is(err, ErrConsumerNotFound) && !errors.Is(err, ErrStreamNotFound) { + return nil, err + } + if consInfo != nil { + return nil, fmt.Errorf("creating consumer %q on stream %q: %w", cfg.Durable, stream, ErrConsumerNameAlreadyInUse) + } + } + + return js.upsertConsumer(stream, cfg, opts...) +} + +func (js *js) UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) { + if cfg == nil { + return nil, ErrConsumerConfigRequired + } + if cfg.Durable == _EMPTY_ { + return nil, ErrConsumerNameRequired + } + return js.upsertConsumer(stream, cfg, opts...) +} + +func (js *js) upsertConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) { if err := checkStreamName(stream); err != nil { return nil, err } @@ -286,7 +327,7 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C var ccSubj string if cfg != nil && cfg.Durable != _EMPTY_ { - if err := checkDurName(cfg.Durable); err != nil { + if err := checkConsumerName(cfg.Durable); err != nil { return nil, err } ccSubj = fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable) @@ -307,30 +348,17 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C return nil, err } if info.Error != nil { - if info.Error.ErrorCode == 10059 { + if info.Error.ErrorCode == StreamNotFound { return nil, ErrStreamNotFound } - if info.Error.Code == 404 { + if info.Error.ErrorCode == ConsumerNotFound { return nil, ErrConsumerNotFound } - return nil, errors.New(info.Error.Description) + return nil, info.Error } return info.ConsumerInfo, nil } -func (js *js) UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) { - if err := checkStreamName(stream); err != nil { - return nil, err - } - if cfg == nil { - return nil, ErrConsumerConfigRequired - } - if cfg.Durable == _EMPTY_ { - return nil, ErrInvalidDurableName - } - return js.AddConsumer(stream, cfg, opts...) -} - // consumerDeleteResponse is the response for a Consumer delete request. type consumerDeleteResponse struct { apiResponse @@ -347,6 +375,8 @@ func checkStreamName(stream string) error { return nil } +// Check that the durable name exists and is valid, that is, that it does not contain any "." +// Returns ErrConsumerNameRequired if consumer name is empty, ErrInvalidConsumerName is invalid, otherwise nil func checkConsumerName(consumer string) error { if consumer == _EMPTY_ { return ErrConsumerNameRequired @@ -384,10 +414,10 @@ func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error { } if resp.Error != nil { - if resp.Error.Code == 404 { + if resp.Error.ErrorCode == ConsumerNotFound { return ErrConsumerNotFound } - return errors.New(resp.Error.Description) + return resp.Error } return nil } @@ -474,7 +504,7 @@ func (c *consumerLister) Next() bool { return false } if resp.Error != nil { - c.err = errors.New(resp.Error.Description) + c.err = resp.Error return false } @@ -571,7 +601,7 @@ func (c *consumerNamesLister) Next() bool { return false } if resp.Error != nil { - c.err = errors.New(resp.Error.Description) + c.err = resp.Error return false } @@ -655,10 +685,10 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) { return nil, err } if resp.Error != nil { - if resp.Error.ErrorCode == 10058 { + if resp.Error.ErrorCode == StreamNameInUse { return nil, ErrStreamNameAlreadyInUse } - return nil, errors.New(resp.Error.Description) + return nil, resp.Error } return resp.StreamInfo, nil @@ -703,10 +733,10 @@ func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) { return nil, err } if resp.Error != nil { - if resp.Error.Code == 404 { + if resp.Error.ErrorCode == StreamNotFound { return nil, ErrStreamNotFound } - return nil, fmt.Errorf("nats: %s", resp.Error.Description) + return nil, resp.Error } return resp.StreamInfo, nil @@ -795,7 +825,10 @@ func (js *js) UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error return nil, err } if resp.Error != nil { - return nil, errors.New(resp.Error.Description) + if resp.Error.ErrorCode == StreamNotFound { + return nil, ErrStreamNotFound + } + return nil, resp.Error } return resp.StreamInfo, nil } @@ -830,10 +863,10 @@ func (js *js) DeleteStream(name string, opts ...JSOpt) error { } if resp.Error != nil { - if resp.Error.Code == 404 { + if resp.Error.ErrorCode == StreamNotFound { return ErrStreamNotFound } - return errors.New(resp.Error.Description) + return resp.Error } return nil } @@ -934,10 +967,13 @@ func (js *js) getMsg(name string, mreq *apiMsgGetRequest, opts ...JSOpt) (*RawSt return nil, err } if resp.Error != nil { - if resp.Error.Code == 404 && strings.Contains(resp.Error.Description, "message") { + if resp.Error.ErrorCode == MessageNotFound { return nil, ErrMsgNotFound } - return nil, fmt.Errorf("nats: %s", resp.Error.Description) + if resp.Error.ErrorCode == StreamNotFound { + return nil, ErrStreamNotFound + } + return nil, resp.Error } msg := resp.Message @@ -1082,7 +1118,7 @@ func (js *js) deleteMsg(ctx context.Context, stream string, req *msgDeleteReques return err } if resp.Error != nil { - return errors.New(resp.Error.Description) + return resp.Error } return nil } @@ -1148,7 +1184,7 @@ func (js *js) purgeStream(stream string, req *StreamPurgeRequest, opts ...JSOpt) if resp.Error.Code == 400 { return fmt.Errorf("%w: %s", ErrBadRequest, "invalid purge request body") } - return errors.New(resp.Error.Description) + return resp.Error } return nil } @@ -1215,7 +1251,7 @@ func (s *streamLister) Next() bool { return false } if resp.Error != nil { - s.err = errors.New(resp.Error.Description) + s.err = resp.Error return false } @@ -1299,7 +1335,7 @@ func (l *streamNamesLister) Next() bool { return false } if resp.Error != nil { - l.err = errors.New(resp.Error.Description) + l.err = resp.Error return false } diff --git a/nats.go b/nats.go index bea0d3dfc..7791b3293 100644 --- a/nats.go +++ b/nats.go @@ -144,7 +144,6 @@ var ( ErrNoStreamResponse = errors.New("nats: no response from stream") ErrNotJSMessage = errors.New("nats: not a jetstream message") ErrInvalidStreamName = errors.New("nats: invalid stream name") - ErrInvalidDurableName = errors.New("nats: invalid durable name") ErrInvalidConsumerName = errors.New("nats: invalid consumer name") ErrNoMatchingStream = errors.New("nats: no stream matches subject") ErrSubjectMismatch = errors.New("nats: subject does not match consumer") @@ -162,6 +161,7 @@ var ( ErrPullSubscribeToPushConsumer = errors.New("nats: cannot pull subscribe to push based consumer") ErrPullSubscribeRequired = errors.New("nats: must use pull subscribe to bind to pull based consumer") ErrConsumerNotActive = errors.New("nats: consumer not active") + ErrConsumerNameAlreadyInUse = errors.New("nats: consumer name already in use") ErrMsgNotFound = errors.New("nats: message not found") ErrMsgAlreadyAckd = errors.New("nats: message was already acknowledged") ErrCantAckIfConsumerAckNone = errors.New("nats: cannot acknowledge a message for a consumer with AckNone policy") @@ -634,22 +634,21 @@ type Subscription struct { // Msg represents a message delivered by NATS. This structure is used // by Subscribers and PublishMsg(). // -// Types of Acknowledgements +// # Types of Acknowledgements // // In case using JetStream, there are multiple ways to ack a Msg: // -// // Acknowledgement that a message has been processed. -// msg.Ack() +// // Acknowledgement that a message has been processed. +// msg.Ack() // -// // Negatively acknowledges a message. -// msg.Nak() +// // Negatively acknowledges a message. +// msg.Nak() // -// // Terminate a message so that it is not redelivered further. -// msg.Term() -// -// // Signal the server that the message is being worked on and reset redelivery timer. -// msg.InProgress() +// // Terminate a message so that it is not redelivered further. +// msg.Term() // +// // Signal the server that the message is being worked on and reset redelivery timer. +// msg.InProgress() type Msg struct { Subject string Reply string diff --git a/test/js_test.go b/test/js_test.go index f644d64a3..5d6b6b497 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -336,7 +336,7 @@ func TestJetStreamSubscribe(t *testing.T) { // Check that Queue subscribe without durable name requires queue name // to not have "." in the name. _, err = js.QueueSubscribeSync("foo", "bar.baz") - if err != nats.ErrInvalidDurableName { + if err != nats.ErrInvalidConsumerName { t.Fatalf("Unexpected error: %v", err) } @@ -668,7 +668,7 @@ func TestJetStreamSubscribe(t *testing.T) { cancel() // Prevent invalid durable names - if _, err := js.SubscribeSync("baz", nats.Durable("test.durable")); err != nats.ErrInvalidDurableName { + if _, err := js.SubscribeSync("baz", nats.Durable("test.durable")); err != nats.ErrInvalidConsumerName { t.Fatalf("Expected invalid durable name error") } @@ -1136,6 +1136,12 @@ func TestJetStreamManagement(t *testing.T) { } }) + t.Run("stream with given name already exists", func(t *testing.T) { + if _, err := js.AddStream(&nats.StreamConfig{Name: "foo", Description: "desc"}); !errors.Is(err, nats.ErrStreamNameAlreadyInUse) { + t.Fatalf("Expected error: %v; got: %v", nats.ErrStreamNameAlreadyInUse, err) + } + }) + for i := 0; i < 25; i++ { js.Publish("foo", []byte("hi")) } @@ -1211,8 +1217,14 @@ func TestJetStreamManagement(t *testing.T) { t.Fatalf("ConsumerInfo is not correct %+v", ci) } - if _, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "test.durable"}); err != nats.ErrInvalidDurableName { - t.Fatalf("Expected invalid durable name error") + if _, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "test.durable"}); err != nats.ErrInvalidConsumerName { + t.Fatalf("Expected: %v; got: %v", nats.ErrInvalidConsumerName, err) + } + }) + + t.Run("consumer with given name already exists", func(t *testing.T) { + if _, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy}); !errors.Is(err, nats.ErrConsumerNameAlreadyInUse) { + t.Fatalf("Expected error: %v; got: %v", nats.ErrConsumerNameAlreadyInUse, err) } }) @@ -1233,8 +1245,8 @@ func TestJetStreamManagement(t *testing.T) { t.Fatalf("Expected %v, got: %v", nats.ErrInvalidStreamName, err) } _, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "bad.consumer.name", AckPolicy: nats.AckExplicitPolicy}) - if err != nats.ErrInvalidDurableName { - t.Fatalf("Expected %v, got: %v", nats.ErrInvalidDurableName, err) + if err != nats.ErrInvalidConsumerName { + t.Fatalf("Expected %v, got: %v", nats.ErrInvalidConsumerName, err) } }) @@ -1365,13 +1377,13 @@ func TestJetStreamManagement(t *testing.T) { // Check that durable name is required expected.Durable = "" _, err = js.UpdateConsumer("foo", &expected) - if err != nats.ErrInvalidDurableName { + if err != nats.ErrConsumerNameRequired { t.Fatalf("Expected consumer name required error, got %v", err) } // Check that durable name is valid expected.Durable = "bad.consumer.name" _, err = js.UpdateConsumer("foo", &expected) - if err != nats.ErrInvalidDurableName { + if err != nats.ErrInvalidConsumerName { t.Fatalf("Expected consumer name required error, got %v", err) } expected.Durable = "update_push_consumer" @@ -4790,7 +4802,7 @@ func testJetStreamMirror_Source(t *testing.T, nodes ...*jsServer) { t.Fatal("Unexpected success") } if !errors.Is(err, nats.ErrStreamNotFound) { - t.Fatal("Expected stream not found error", err.Error()) + t.Fatalf("Expected error: %v; got: %v", nats.ErrStreamNotFound, err) } }) @@ -4799,8 +4811,9 @@ func testJetStreamMirror_Source(t *testing.T, nodes ...*jsServer) { if err == nil { t.Fatal("Unexpected success") } - if err.Error() != `nats: consumer filter subject is not a valid subset of the interest subjects` { - t.Fatal("Expected stream not found error") + apiErr := &nats.APIError{} + if !errors.As(err, &apiErr) || apiErr.ErrorCode != 10093 { + t.Fatalf("Expected API error 10093; got: %v", err) } }) From 82272a03d967d11efa2cbb92a352853fcb02c94a Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Tue, 16 Aug 2022 09:54:34 +0200 Subject: [PATCH 2/2] Address review comments --- js.go | 8 +-- jsm.go | 44 +++++++------ nats.go | 165 +++++++++++++++++++++++++----------------------- test/js_test.go | 24 ++++++- 4 files changed, 137 insertions(+), 104 deletions(-) diff --git a/js.go b/js.go index c52b44a55..9b150eabb 100644 --- a/js.go +++ b/js.go @@ -1654,7 +1654,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, cleanUpSub() } if consumer != _EMPTY_ && - (cinfo.Error.ErrorCode == ConsumerAlreadyExists || cinfo.Error.ErrorCode == ConsumerNameExists) { + (cinfo.Error.ErrorCode == JSErrCodeConsumerAlreadyExists || cinfo.Error.ErrorCode == JSErrCodeConsumerNameExists) { info, err = js.ConsumerInfo(stream, consumer) if err != nil { @@ -1691,7 +1691,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, hasHeartbeats = info.Config.Heartbeat > 0 } } else { - if cinfo.Error.ErrorCode == StreamNotFound { + if cinfo.Error.ErrorCode == JSErrCodeStreamNotFound { return nil, ErrStreamNotFound } return nil, cinfo.Error @@ -2750,10 +2750,10 @@ func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer strin return nil, err } if info.Error != nil { - if info.Error.ErrorCode == ConsumerNotFound { + if info.Error.ErrorCode == JSErrCodeConsumerNotFound { return nil, ErrConsumerNotFound } - if info.Error.ErrorCode == StreamNotFound { + if info.Error.ErrorCode == JSErrCodeStreamNotFound { return nil, ErrStreamNotFound } return nil, info.Error diff --git a/jsm.go b/jsm.go index fa7457cf7..e7bf8a88f 100644 --- a/jsm.go +++ b/jsm.go @@ -222,16 +222,17 @@ type accountInfoResponse struct { type ErrorCode uint16 const ( - JetStreamNotEnabledForAccount ErrorCode = 10039 + JSErrCodeJetStreamNotEnabledForAccount ErrorCode = 10039 + JSErrCodeJetStreamNotEnabled ErrorCode = 10076 - StreamNotFound ErrorCode = 10059 - StreamNameInUse ErrorCode = 10058 + JSErrCodeStreamNotFound ErrorCode = 10059 + JSErrCodeStreamNameInUse ErrorCode = 10058 - ConsumerNotFound ErrorCode = 10014 - ConsumerNameExists ErrorCode = 10013 - ConsumerAlreadyExists ErrorCode = 10105 + JSErrCodeConsumerNotFound ErrorCode = 10014 + JSErrCodeConsumerNameExists ErrorCode = 10013 + JSErrCodeConsumerAlreadyExists ErrorCode = 10105 - MessageNotFound ErrorCode = 10037 + JSErrCodeMessageNotFound ErrorCode = 10037 ) // Error prints the JetStream API error code and description @@ -264,7 +265,10 @@ func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) { return nil, err } if info.Error != nil { - if info.Error.ErrorCode == JetStreamNotEnabledForAccount { + if info.Error.ErrorCode == JSErrCodeJetStreamNotEnabledForAccount { + return nil, ErrJetStreamNotEnabledForAccount + } + if info.Error.ErrorCode == JSErrCodeJetStreamNotEnabled { return nil, ErrJetStreamNotEnabled } return nil, info.Error @@ -290,8 +294,12 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C if err != nil && !errors.Is(err, ErrConsumerNotFound) && !errors.Is(err, ErrStreamNotFound) { return nil, err } + if consInfo != nil { - return nil, fmt.Errorf("creating consumer %q on stream %q: %w", cfg.Durable, stream, ErrConsumerNameAlreadyInUse) + sameConfig := checkConfig(&consInfo.Config, cfg) + if sameConfig != nil { + return nil, fmt.Errorf("%w: creating consumer %q on stream %q", ErrConsumerNameAlreadyInUse, cfg.Durable, stream) + } } } @@ -348,10 +356,10 @@ func (js *js) upsertConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) return nil, err } if info.Error != nil { - if info.Error.ErrorCode == StreamNotFound { + if info.Error.ErrorCode == JSErrCodeStreamNotFound { return nil, ErrStreamNotFound } - if info.Error.ErrorCode == ConsumerNotFound { + if info.Error.ErrorCode == JSErrCodeConsumerNotFound { return nil, ErrConsumerNotFound } return nil, info.Error @@ -414,7 +422,7 @@ func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error { } if resp.Error != nil { - if resp.Error.ErrorCode == ConsumerNotFound { + if resp.Error.ErrorCode == JSErrCodeConsumerNotFound { return ErrConsumerNotFound } return resp.Error @@ -685,7 +693,7 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) { return nil, err } if resp.Error != nil { - if resp.Error.ErrorCode == StreamNameInUse { + if resp.Error.ErrorCode == JSErrCodeStreamNameInUse { return nil, ErrStreamNameAlreadyInUse } return nil, resp.Error @@ -733,7 +741,7 @@ func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) { return nil, err } if resp.Error != nil { - if resp.Error.ErrorCode == StreamNotFound { + if resp.Error.ErrorCode == JSErrCodeStreamNotFound { return nil, ErrStreamNotFound } return nil, resp.Error @@ -825,7 +833,7 @@ func (js *js) UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error return nil, err } if resp.Error != nil { - if resp.Error.ErrorCode == StreamNotFound { + if resp.Error.ErrorCode == JSErrCodeStreamNotFound { return nil, ErrStreamNotFound } return nil, resp.Error @@ -863,7 +871,7 @@ func (js *js) DeleteStream(name string, opts ...JSOpt) error { } if resp.Error != nil { - if resp.Error.ErrorCode == StreamNotFound { + if resp.Error.ErrorCode == JSErrCodeStreamNotFound { return ErrStreamNotFound } return resp.Error @@ -967,10 +975,10 @@ func (js *js) getMsg(name string, mreq *apiMsgGetRequest, opts ...JSOpt) (*RawSt return nil, err } if resp.Error != nil { - if resp.Error.ErrorCode == MessageNotFound { + if resp.Error.ErrorCode == JSErrCodeMessageNotFound { return nil, ErrMsgNotFound } - if resp.Error.ErrorCode == StreamNotFound { + if resp.Error.ErrorCode == JSErrCodeStreamNotFound { return nil, ErrStreamNotFound } return nil, resp.Error diff --git a/nats.go b/nats.go index 7791b3293..dbd250c0b 100644 --- a/nats.go +++ b/nats.go @@ -90,86 +90,91 @@ const ( // Errors var ( - ErrConnectionClosed = errors.New("nats: connection closed") - ErrConnectionDraining = errors.New("nats: connection draining") - ErrDrainTimeout = errors.New("nats: draining connection timed out") - ErrConnectionReconnecting = errors.New("nats: connection reconnecting") - ErrSecureConnRequired = errors.New("nats: secure connection required") - ErrSecureConnWanted = errors.New("nats: secure connection not available") - ErrBadSubscription = errors.New("nats: invalid subscription") - ErrTypeSubscription = errors.New("nats: invalid subscription type") - ErrBadSubject = errors.New("nats: invalid subject") - ErrBadQueueName = errors.New("nats: invalid queue name") - ErrSlowConsumer = errors.New("nats: slow consumer, messages dropped") - ErrTimeout = errors.New("nats: timeout") - ErrBadTimeout = errors.New("nats: timeout invalid") - ErrAuthorization = errors.New("nats: authorization violation") - ErrAuthExpired = errors.New("nats: authentication expired") - ErrAuthRevoked = errors.New("nats: authentication revoked") - ErrAccountAuthExpired = errors.New("nats: account authentication expired") - ErrNoServers = errors.New("nats: no servers available for connection") - ErrJsonParse = errors.New("nats: connect message, json parse error") - ErrChanArg = errors.New("nats: argument needs to be a channel type") - ErrMaxPayload = errors.New("nats: maximum payload exceeded") - ErrMaxMessages = errors.New("nats: maximum messages delivered") - ErrSyncSubRequired = errors.New("nats: illegal call on an async subscription") - ErrMultipleTLSConfigs = errors.New("nats: multiple tls.Configs not allowed") - ErrNoInfoReceived = errors.New("nats: protocol exception, INFO not received") - ErrReconnectBufExceeded = errors.New("nats: outbound buffer limit exceeded") - ErrInvalidConnection = errors.New("nats: invalid connection") - ErrInvalidMsg = errors.New("nats: invalid message or message nil") - ErrInvalidArg = errors.New("nats: invalid argument") - ErrInvalidContext = errors.New("nats: invalid context") - ErrNoDeadlineContext = errors.New("nats: context requires a deadline") - ErrNoEchoNotSupported = errors.New("nats: no echo option not supported by this server") - ErrClientIDNotSupported = errors.New("nats: client ID not supported by this server") - ErrUserButNoSigCB = errors.New("nats: user callback defined without a signature handler") - ErrNkeyButNoSigCB = errors.New("nats: nkey defined without a signature handler") - ErrNoUserCB = errors.New("nats: user callback not defined") - ErrNkeyAndUser = errors.New("nats: user callback and nkey defined") - ErrNkeysNotSupported = errors.New("nats: nkeys not supported by the server") - ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION) - ErrTokenAlreadySet = errors.New("nats: token and token handler both set") - ErrMsgNotBound = errors.New("nats: message is not bound to subscription/connection") - ErrMsgNoReply = errors.New("nats: message does not have a reply") - ErrClientIPNotSupported = errors.New("nats: client IP not supported by this server") - ErrDisconnected = errors.New("nats: server is disconnected") - ErrHeadersNotSupported = errors.New("nats: headers not supported by this server") - ErrBadHeaderMsg = errors.New("nats: message could not decode headers") - ErrNoResponders = errors.New("nats: no responders available for request") - ErrNoContextOrTimeout = errors.New("nats: no context or timeout given") - ErrPullModeNotAllowed = errors.New("nats: pull based not supported") - ErrJetStreamNotEnabled = errors.New("nats: jetstream not enabled") - ErrJetStreamBadPre = errors.New("nats: jetstream api prefix not valid") - ErrNoStreamResponse = errors.New("nats: no response from stream") - ErrNotJSMessage = errors.New("nats: not a jetstream message") - ErrInvalidStreamName = errors.New("nats: invalid stream name") - ErrInvalidConsumerName = errors.New("nats: invalid consumer name") - ErrNoMatchingStream = errors.New("nats: no stream matches subject") - ErrSubjectMismatch = errors.New("nats: subject does not match consumer") - ErrContextAndTimeout = errors.New("nats: context and timeout can not both be set") - ErrInvalidJSAck = errors.New("nats: invalid jetstream publish response") - ErrMultiStreamUnsupported = errors.New("nats: multiple streams are not supported") - ErrStreamConfigRequired = errors.New("nats: stream configuration is required") - ErrStreamNameRequired = errors.New("nats: stream name is required") - ErrStreamNotFound = errors.New("nats: stream not found") - ErrConsumerNotFound = errors.New("nats: consumer not found") - ErrConsumerNameRequired = errors.New("nats: consumer name is required") - ErrConsumerConfigRequired = errors.New("nats: consumer configuration is required") - ErrStreamSnapshotConfigRequired = errors.New("nats: stream snapshot configuration is required") - ErrDeliverSubjectRequired = errors.New("nats: deliver subject is required") - ErrPullSubscribeToPushConsumer = errors.New("nats: cannot pull subscribe to push based consumer") - ErrPullSubscribeRequired = errors.New("nats: must use pull subscribe to bind to pull based consumer") - ErrConsumerNotActive = errors.New("nats: consumer not active") - ErrConsumerNameAlreadyInUse = errors.New("nats: consumer name already in use") - ErrMsgNotFound = errors.New("nats: message not found") - ErrMsgAlreadyAckd = errors.New("nats: message was already acknowledged") - ErrCantAckIfConsumerAckNone = errors.New("nats: cannot acknowledge a message for a consumer with AckNone policy") - ErrStreamInfoMaxSubjects = errors.New("nats: subject details would exceed maximum allowed") - ErrStreamNameAlreadyInUse = errors.New("nats: stream name already in use") - ErrMaxConnectionsExceeded = errors.New("nats: server maximum connections exceeded") - ErrBadRequest = errors.New("nats: bad request") - ErrConnectionNotTLS = errors.New("nats: connection is not tls") + ErrConnectionClosed = errors.New("nats: connection closed") + ErrConnectionDraining = errors.New("nats: connection draining") + ErrDrainTimeout = errors.New("nats: draining connection timed out") + ErrConnectionReconnecting = errors.New("nats: connection reconnecting") + ErrSecureConnRequired = errors.New("nats: secure connection required") + ErrSecureConnWanted = errors.New("nats: secure connection not available") + ErrBadSubscription = errors.New("nats: invalid subscription") + ErrTypeSubscription = errors.New("nats: invalid subscription type") + ErrBadSubject = errors.New("nats: invalid subject") + ErrBadQueueName = errors.New("nats: invalid queue name") + ErrSlowConsumer = errors.New("nats: slow consumer, messages dropped") + ErrTimeout = errors.New("nats: timeout") + ErrBadTimeout = errors.New("nats: timeout invalid") + ErrAuthorization = errors.New("nats: authorization violation") + ErrAuthExpired = errors.New("nats: authentication expired") + ErrAuthRevoked = errors.New("nats: authentication revoked") + ErrAccountAuthExpired = errors.New("nats: account authentication expired") + ErrNoServers = errors.New("nats: no servers available for connection") + ErrJsonParse = errors.New("nats: connect message, json parse error") + ErrChanArg = errors.New("nats: argument needs to be a channel type") + ErrMaxPayload = errors.New("nats: maximum payload exceeded") + ErrMaxMessages = errors.New("nats: maximum messages delivered") + ErrSyncSubRequired = errors.New("nats: illegal call on an async subscription") + ErrMultipleTLSConfigs = errors.New("nats: multiple tls.Configs not allowed") + ErrNoInfoReceived = errors.New("nats: protocol exception, INFO not received") + ErrReconnectBufExceeded = errors.New("nats: outbound buffer limit exceeded") + ErrInvalidConnection = errors.New("nats: invalid connection") + ErrInvalidMsg = errors.New("nats: invalid message or message nil") + ErrInvalidArg = errors.New("nats: invalid argument") + ErrInvalidContext = errors.New("nats: invalid context") + ErrNoDeadlineContext = errors.New("nats: context requires a deadline") + ErrNoEchoNotSupported = errors.New("nats: no echo option not supported by this server") + ErrClientIDNotSupported = errors.New("nats: client ID not supported by this server") + ErrUserButNoSigCB = errors.New("nats: user callback defined without a signature handler") + ErrNkeyButNoSigCB = errors.New("nats: nkey defined without a signature handler") + ErrNoUserCB = errors.New("nats: user callback not defined") + ErrNkeyAndUser = errors.New("nats: user callback and nkey defined") + ErrNkeysNotSupported = errors.New("nats: nkeys not supported by the server") + ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION) + ErrTokenAlreadySet = errors.New("nats: token and token handler both set") + ErrMsgNotBound = errors.New("nats: message is not bound to subscription/connection") + ErrMsgNoReply = errors.New("nats: message does not have a reply") + ErrClientIPNotSupported = errors.New("nats: client IP not supported by this server") + ErrDisconnected = errors.New("nats: server is disconnected") + ErrHeadersNotSupported = errors.New("nats: headers not supported by this server") + ErrBadHeaderMsg = errors.New("nats: message could not decode headers") + ErrNoResponders = errors.New("nats: no responders available for request") + ErrNoContextOrTimeout = errors.New("nats: no context or timeout given") + ErrPullModeNotAllowed = errors.New("nats: pull based not supported") + ErrJetStreamNotEnabledForAccount = errors.New("nats: jetstream not enabled for this account") + ErrJetStreamNotEnabled = errors.New("nats: jetstream not enabled") + ErrJetStreamBadPre = errors.New("nats: jetstream api prefix not valid") + ErrNoStreamResponse = errors.New("nats: no response from stream") + ErrNotJSMessage = errors.New("nats: not a jetstream message") + ErrInvalidStreamName = errors.New("nats: invalid stream name") + ErrInvalidConsumerName = errors.New("nats: invalid consumer name") + ErrNoMatchingStream = errors.New("nats: no stream matches subject") + ErrSubjectMismatch = errors.New("nats: subject does not match consumer") + ErrContextAndTimeout = errors.New("nats: context and timeout can not both be set") + ErrInvalidJSAck = errors.New("nats: invalid jetstream publish response") + ErrMultiStreamUnsupported = errors.New("nats: multiple streams are not supported") + ErrStreamConfigRequired = errors.New("nats: stream configuration is required") + ErrStreamNameRequired = errors.New("nats: stream name is required") + ErrStreamNotFound = errors.New("nats: stream not found") + ErrConsumerNotFound = errors.New("nats: consumer not found") + ErrConsumerNameRequired = errors.New("nats: consumer name is required") + ErrConsumerConfigRequired = errors.New("nats: consumer configuration is required") + ErrStreamSnapshotConfigRequired = errors.New("nats: stream snapshot configuration is required") + ErrDeliverSubjectRequired = errors.New("nats: deliver subject is required") + ErrPullSubscribeToPushConsumer = errors.New("nats: cannot pull subscribe to push based consumer") + ErrPullSubscribeRequired = errors.New("nats: must use pull subscribe to bind to pull based consumer") + ErrConsumerNotActive = errors.New("nats: consumer not active") + ErrConsumerNameAlreadyInUse = errors.New("nats: consumer name already in use") + ErrMsgNotFound = errors.New("nats: message not found") + ErrMsgAlreadyAckd = errors.New("nats: message was already acknowledged") + ErrCantAckIfConsumerAckNone = errors.New("nats: cannot acknowledge a message for a consumer with AckNone policy") + ErrStreamInfoMaxSubjects = errors.New("nats: subject details would exceed maximum allowed") + ErrStreamNameAlreadyInUse = errors.New("nats: stream name already in use") + ErrMaxConnectionsExceeded = errors.New("nats: server maximum connections exceeded") + ErrBadRequest = errors.New("nats: bad request") + ErrConnectionNotTLS = errors.New("nats: connection is not tls") + + // DEPRECATED: ErrInvalidDurableName is no longer returned and will be removed in future releases + // Use ErrInvalidConsumerName instead + ErrInvalidDurableName = errors.New("nats: invalid durable name") ) func init() { diff --git a/test/js_test.go b/test/js_test.go index 5d6b6b497..ba7266e90 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -87,7 +87,7 @@ func TestJetStreamNotAccountEnabled(t *testing.T) { nc, js := jsClient(t, s) defer nc.Close() - if _, err := js.AccountInfo(); err != nats.ErrJetStreamNotEnabled { + if _, err := js.AccountInfo(); err != nats.ErrJetStreamNotEnabledForAccount { t.Fatalf("Did not get the proper error, got %v", err) } } @@ -1223,9 +1223,15 @@ func TestJetStreamManagement(t *testing.T) { }) t.Run("consumer with given name already exists", func(t *testing.T) { - if _, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy}); !errors.Is(err, nats.ErrConsumerNameAlreadyInUse) { + // configs do not match + if _, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckAllPolicy}); !errors.Is(err, nats.ErrConsumerNameAlreadyInUse) { t.Fatalf("Expected error: %v; got: %v", nats.ErrConsumerNameAlreadyInUse, err) } + + // configs are the same + if _, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy}); err != nil { + t.Fatalf("Expected no error; got: %v", err) + } }) t.Run("create consumer on missing stream", func(t *testing.T) { @@ -1590,6 +1596,20 @@ func TestAccountInfo(t *testing.T) { `, withError: nats.ErrJetStreamNotEnabled, }, + { + name: "jetstream not enabled for account", + cfg: ` + listen: 127.0.0.1:-1 + no_auth_user: foo + jetstream: enabled + accounts: { + A: { + users: [ {user: foo} ] + }, + } + `, + withError: nats.ErrJetStreamNotEnabledForAccount, + }, } for _, test := range tests {