Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Add JetStream error codes, extract ErrConsumerNameAlreadyInUse #1044

Merged
merged 2 commits into from Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
40 changes: 18 additions & 22 deletions js.go
Expand Up @@ -524,7 +524,7 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
return nil, ErrInvalidJSAck
}
if pa.Error != nil {
return nil, fmt.Errorf("nats: %s", pa.Error.Description)
return nil, pa.Error
}
if pa.PubAck == nil || pa.PubAck.Stream == _EMPTY_ {
return nil, ErrInvalidJSAck
Expand Down Expand Up @@ -728,7 +728,7 @@ func (js *js) handleAsyncReply(m *Msg) {
return
}
if pa.Error != nil {
doErr(fmt.Errorf("nats: %s", pa.Error.Description))
doErr(pa.Error)
return
}
if pa.PubAck == nil || pa.PubAck.Stream == _EMPTY_ {
Expand Down Expand Up @@ -1228,7 +1228,10 @@ func (js *js) ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOp
// See important note in Subscribe()
func (js *js) PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) {
mch := make(chan *Msg, js.nc.Opts.SubChanLen)
return js.subscribe(subj, _EMPTY_, nil, mch, true, true, append(opts, Durable(durable)))
if durable != "" {
opts = append(opts, Durable(durable))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this change behavior to support the ephemerals?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, pull ephemerals are already supported. This change is introduced because nats.Durable() should not accept empty consumer name (or else checkConsumerName() will fail)

return js.subscribe(subj, _EMPTY_, nil, mch, true, true, opts)
}

func processConsInfo(info *ConsumerInfo, userCfg *ConsumerConfig, isPullMode bool, subj, queue string) (string, error) {
Expand Down Expand Up @@ -1394,7 +1397,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
// If this is a queue subscription and no consumer nor durable name was specified,
// then we will use the queue name as a durable name.
if o.consumer == _EMPTY_ && o.cfg.Durable == _EMPTY_ {
if err := checkDurName(queue); err != nil {
if err := checkConsumerName(queue); err != nil {
return nil, err
}
o.cfg.Durable = queue
Expand Down Expand Up @@ -1651,8 +1654,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
cleanUpSub()
}
if consumer != _EMPTY_ &&
(strings.Contains(cinfo.Error.Description, `consumer already exists`) ||
strings.Contains(cinfo.Error.Description, `consumer name already in use`)) {
(cinfo.Error.ErrorCode == ConsumerAlreadyExists || cinfo.Error.ErrorCode == ConsumerNameExists) {

info, err = js.ConsumerInfo(stream, consumer)
if err != nil {
Expand Down Expand Up @@ -1689,10 +1691,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
hasHeartbeats = info.Config.Heartbeat > 0
}
} else {
if cinfo.Error.Code == 404 {
if cinfo.Error.ErrorCode == StreamNotFound {
return nil, ErrStreamNotFound
}
return nil, fmt.Errorf("nats: %s", cinfo.Error.Description)
return nil, cinfo.Error
}
} else {
// Since the library created the JS consumer, it will delete it on Unsubscribe()/Drain()
Expand Down Expand Up @@ -1970,7 +1972,7 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
}

if cinfo.Error != nil {
pushErr(fmt.Errorf("nats: %s", cinfo.Error.Description))
pushErr(cinfo.Error)
return
}

Expand Down Expand Up @@ -2185,17 +2187,8 @@ func Description(description string) SubOpt {
})
}

// Check that the durable name is valid, that is, that it does not contain
// any ".", and if it does return ErrInvalidDurableName, otherwise nil.
func checkDurName(dur string) error {
if strings.Contains(dur, ".") {
return ErrInvalidDurableName
}
return nil
}

// Durable defines the consumer name for JetStream durable subscribers.
// This function will return ErrInvalidDurableName in the name contains
// This function will return ErrInvalidConsumerName in the name contains
// any dot ".".
func Durable(consumer string) SubOpt {
return subOptFn(func(opts *subOpts) error {
Expand All @@ -2205,7 +2198,7 @@ func Durable(consumer string) SubOpt {
if opts.consumer != _EMPTY_ && opts.consumer != consumer {
return fmt.Errorf("nats: duplicate consumer names (%s and %s)", opts.consumer, consumer)
}
if err := checkDurName(consumer); err != nil {
if err := checkConsumerName(consumer); err != nil {
return err
}

Expand Down Expand Up @@ -2757,10 +2750,13 @@ func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer strin
return nil, err
}
if info.Error != nil {
if info.Error.Code == 404 {
if info.Error.ErrorCode == ConsumerNotFound {
return nil, ErrConsumerNotFound
}
return nil, fmt.Errorf("nats: %s", info.Error.Description)
if info.Error.ErrorCode == StreamNotFound {
return nil, ErrStreamNotFound
}
return nil, info.Error
}
return info.ConsumerInfo, nil
}
Expand Down
122 changes: 79 additions & 43 deletions jsm.go
Expand Up @@ -156,9 +156,9 @@ type ExternalStream struct {

// APIError is included in all API responses if there was an error.
type APIError struct {
Code int `json:"code"`
ErrorCode int `json:"err_code"`
Description string `json:"description,omitempty"`
Code int `json:"code"`
ErrorCode ErrorCode `json:"err_code"`
Description string `json:"description,omitempty"`
}

// apiResponse is a standard response from the JetStream JSON API
Expand Down Expand Up @@ -219,6 +219,26 @@ type accountInfoResponse struct {
AccountInfo
}

type ErrorCode uint16

const (
JetStreamNotEnabledForAccount ErrorCode = 10039

StreamNotFound ErrorCode = 10059
StreamNameInUse ErrorCode = 10058

ConsumerNotFound ErrorCode = 10014
ConsumerNameExists ErrorCode = 10013
ConsumerAlreadyExists ErrorCode = 10105

MessageNotFound ErrorCode = 10037
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit worried that we are putting too many of these into the nats namespace package, wonder if it is time to move them into their own package... Or maybe suffix them like MessageNotFoundErrorCode? code in the client then would read like: esp.Error.ErrorCode == MessageNotFoundErrorCode

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about extracting those codes together with errors to a separate package, but moving already defined errors would be a breaking change, so I decided against it. I think the format with ErrorCode prefix makes sense, I'll change that.

)

// Error prints the JetStream API error code and description
func (e *APIError) Error() string {
return fmt.Sprintf("nats: API error %d: %s", e.ErrorCode, e.Description)
}

// AccountInfo retrieves info about the JetStream usage from the current account.
// If JetStream is not enabled, this will return ErrJetStreamNotEnabled
// Other errors can happen but are generally considered retryable
Expand All @@ -244,13 +264,10 @@ func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) {
return nil, err
}
if info.Error != nil {
var err error
if strings.Contains(info.Error.Description, "not enabled for") {
err = ErrJetStreamNotEnabled
} else {
err = errors.New(info.Error.Description)
if info.Error.ErrorCode == JetStreamNotEnabledForAccount {
return nil, ErrJetStreamNotEnabled
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the server not enabled for account and not enabled looks like they are two different errors actually... maybe we should start distinguishing them too and use ErrJetStreamNotEnabledForAccount?

https://github.com/nats-io/nats-server/blob/52c4872666a26acdf475cb89ae9b5f367324fadd/server/jetstream_errors_generated.go#L472-L474

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another factor why maybe we should clean it up is that JetStreamNotEnabledForAccount is a hard error since means that account can never use JetStream in current setup, whereas things are a bit less certain in ErrJetStreamNotEnabled, it can be either a temporary error due to JS not ready yet.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, distinguishing these errors is a good idea.

}
return nil, err
return nil, info.Error
}

return &info.AccountInfo, nil
Expand All @@ -268,6 +285,30 @@ type consumerResponse struct {

// AddConsumer will add a JetStream consumer.
func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
if cfg != nil && cfg.Durable != _EMPTY_ {
consInfo, err := js.ConsumerInfo(stream, cfg.Durable)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is good to call Info before, but also changes the behavior slightly since need to give permissions to info in across account cases which they probably would have anyway.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I would lean towards leaving that, as without that AddConsumer() I feel does not work as the name suggests (it will simply update a consumer if it already exists).

if err != nil && !errors.Is(err, ErrConsumerNotFound) && !errors.Is(err, ErrStreamNotFound) {
return nil, err
}
if consInfo != nil {
return nil, fmt.Errorf("creating consumer %q on stream %q: %w", cfg.Durable, stream, ErrConsumerNameAlreadyInUse)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think need to add nats: prefix here? maybe not an error if the intended config is the same as the result of Info (idempotent) ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, idempotency check is a good idea. As to nats: prefix, I'll just reverse the order of the message and put ErrConsumerNameAlreadyInUse at the beginning.

}
}

return js.upsertConsumer(stream, cfg, opts...)
}

func (js *js) UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
if cfg == nil {
return nil, ErrConsumerConfigRequired
}
if cfg.Durable == _EMPTY_ {
return nil, ErrConsumerNameRequired
}
return js.upsertConsumer(stream, cfg, opts...)
}

func (js *js) upsertConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
if err := checkStreamName(stream); err != nil {
return nil, err
}
Expand All @@ -286,7 +327,7 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C

var ccSubj string
if cfg != nil && cfg.Durable != _EMPTY_ {
if err := checkDurName(cfg.Durable); err != nil {
if err := checkConsumerName(cfg.Durable); err != nil {
return nil, err
}
ccSubj = fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable)
Expand All @@ -307,30 +348,17 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C
return nil, err
}
if info.Error != nil {
if info.Error.ErrorCode == 10059 {
if info.Error.ErrorCode == StreamNotFound {
return nil, ErrStreamNotFound
}
if info.Error.Code == 404 {
if info.Error.ErrorCode == ConsumerNotFound {
return nil, ErrConsumerNotFound
}
return nil, errors.New(info.Error.Description)
return nil, info.Error
}
return info.ConsumerInfo, nil
}

func (js *js) UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
if err := checkStreamName(stream); err != nil {
return nil, err
}
if cfg == nil {
return nil, ErrConsumerConfigRequired
}
if cfg.Durable == _EMPTY_ {
return nil, ErrInvalidDurableName
}
return js.AddConsumer(stream, cfg, opts...)
}

// consumerDeleteResponse is the response for a Consumer delete request.
type consumerDeleteResponse struct {
apiResponse
Expand All @@ -347,6 +375,8 @@ func checkStreamName(stream string) error {
return nil
}

// Check that the durable name exists and is valid, that is, that it does not contain any "."
// Returns ErrConsumerNameRequired if consumer name is empty, ErrInvalidConsumerName is invalid, otherwise nil
func checkConsumerName(consumer string) error {
if consumer == _EMPTY_ {
return ErrConsumerNameRequired
Expand Down Expand Up @@ -384,10 +414,10 @@ func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error {
}

if resp.Error != nil {
if resp.Error.Code == 404 {
if resp.Error.ErrorCode == ConsumerNotFound {
Copy link
Member

@wallyqs wallyqs Aug 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder if we could make these ErrConsumerNotFound plain error types, instead conform to the Error interface and maybe a JetStreamAPIError interface and stamp them with their error codes there, something like:

ErrConsumerNotFound             = &apiError{ErrorCode: ...}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. I have an idea on how to make the implementation a bit cleaner, but that would require changing the way errors are compared:

  1. As you mentioned, change the defined error types to APIError, like so:
    ErrConsumerNotFound = &APIError{ErrorCode: 10014, Code: 404, Description: "nats: consumer not found"}
  2. Implement custom Is() method on APIError, comparing error codes:
func (e *APIError) Is(target error) bool {
	err, ok := target.(*APIError)
	if !ok {
		return false
	}
	return err.ErrorCode == e.ErrorCode && err.Code == e.Code
}
  1. Simplify client method implementations so that we do not have to compare the error codes manually at all, e.g.:
	if info.Error != nil {
		return nil, info.Error
	}

instead of

	if info.Error != nil {
		if info.Error.ErrorCode == ConsumerNotFound {
			return nil, ErrConsumerNotFound
		}
		if info.Error.ErrorCode == StreamNotFound {
			return nil, ErrStreamNotFound
		}
		return nil, info.Error
	}

However, that means user would not be able to check errors by simple comparison (err == ErrConsumerNotFound), but instead rely on errors.Is(err, ErrConsumerNotFound) (which usually should be the preferred way to go).

The other approach would be to leave the client method implementations untouched and simply change the variable definitions to be &APIError{} instead of errors.New().

What do you think?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, what do you mean by implementing JetStreamAPIError interface (i.e. what would it do). The error interface is implemented as part of this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put together an example of what we could do via an interface here: #1047

return ErrConsumerNotFound
}
return errors.New(resp.Error.Description)
return resp.Error
}
return nil
}
Expand Down Expand Up @@ -474,7 +504,7 @@ func (c *consumerLister) Next() bool {
return false
}
if resp.Error != nil {
c.err = errors.New(resp.Error.Description)
c.err = resp.Error
return false
}

Expand Down Expand Up @@ -571,7 +601,7 @@ func (c *consumerNamesLister) Next() bool {
return false
}
if resp.Error != nil {
c.err = errors.New(resp.Error.Description)
c.err = resp.Error
return false
}

Expand Down Expand Up @@ -655,10 +685,10 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
return nil, err
}
if resp.Error != nil {
if resp.Error.ErrorCode == 10058 {
if resp.Error.ErrorCode == StreamNameInUse {
return nil, ErrStreamNameAlreadyInUse
}
return nil, errors.New(resp.Error.Description)
return nil, resp.Error
}

return resp.StreamInfo, nil
Expand Down Expand Up @@ -703,10 +733,10 @@ func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) {
return nil, err
}
if resp.Error != nil {
if resp.Error.Code == 404 {
if resp.Error.ErrorCode == StreamNotFound {
return nil, ErrStreamNotFound
}
return nil, fmt.Errorf("nats: %s", resp.Error.Description)
return nil, resp.Error
}

return resp.StreamInfo, nil
Expand Down Expand Up @@ -795,7 +825,10 @@ func (js *js) UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error
return nil, err
}
if resp.Error != nil {
return nil, errors.New(resp.Error.Description)
if resp.Error.ErrorCode == StreamNotFound {
return nil, ErrStreamNotFound
}
return nil, resp.Error
}
return resp.StreamInfo, nil
}
Expand Down Expand Up @@ -830,10 +863,10 @@ func (js *js) DeleteStream(name string, opts ...JSOpt) error {
}

if resp.Error != nil {
if resp.Error.Code == 404 {
if resp.Error.ErrorCode == StreamNotFound {
return ErrStreamNotFound
}
return errors.New(resp.Error.Description)
return resp.Error
}
return nil
}
Expand Down Expand Up @@ -934,10 +967,13 @@ func (js *js) getMsg(name string, mreq *apiMsgGetRequest, opts ...JSOpt) (*RawSt
return nil, err
}
if resp.Error != nil {
if resp.Error.Code == 404 && strings.Contains(resp.Error.Description, "message") {
if resp.Error.ErrorCode == MessageNotFound {
return nil, ErrMsgNotFound
}
return nil, fmt.Errorf("nats: %s", resp.Error.Description)
if resp.Error.ErrorCode == StreamNotFound {
return nil, ErrStreamNotFound
}
return nil, resp.Error
}

msg := resp.Message
Expand Down Expand Up @@ -1082,7 +1118,7 @@ func (js *js) deleteMsg(ctx context.Context, stream string, req *msgDeleteReques
return err
}
if resp.Error != nil {
return errors.New(resp.Error.Description)
return resp.Error
}
return nil
}
Expand Down Expand Up @@ -1148,7 +1184,7 @@ func (js *js) purgeStream(stream string, req *StreamPurgeRequest, opts ...JSOpt)
if resp.Error.Code == 400 {
return fmt.Errorf("%w: %s", ErrBadRequest, "invalid purge request body")
}
return errors.New(resp.Error.Description)
return resp.Error
}
return nil
}
Expand Down Expand Up @@ -1215,7 +1251,7 @@ func (s *streamLister) Next() bool {
return false
}
if resp.Error != nil {
s.err = errors.New(resp.Error.Description)
s.err = resp.Error
return false
}

Expand Down Expand Up @@ -1299,7 +1335,7 @@ func (l *streamNamesLister) Next() bool {
return false
}
if resp.Error != nil {
l.err = errors.New(resp.Error.Description)
l.err = resp.Error
return false
}

Expand Down