New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[IMPROVED] Add JetStream error codes, extract ErrConsumerNameAlreadyInUse #1044
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -156,9 +156,9 @@ type ExternalStream struct { | |
|
||
// APIError is included in all API responses if there was an error. | ||
type APIError struct { | ||
Code int `json:"code"` | ||
ErrorCode int `json:"err_code"` | ||
Description string `json:"description,omitempty"` | ||
Code int `json:"code"` | ||
ErrorCode ErrorCode `json:"err_code"` | ||
Description string `json:"description,omitempty"` | ||
} | ||
|
||
// apiResponse is a standard response from the JetStream JSON API | ||
|
@@ -219,6 +219,26 @@ type accountInfoResponse struct { | |
AccountInfo | ||
} | ||
|
||
type ErrorCode uint16 | ||
|
||
const ( | ||
JetStreamNotEnabledForAccount ErrorCode = 10039 | ||
|
||
StreamNotFound ErrorCode = 10059 | ||
StreamNameInUse ErrorCode = 10058 | ||
|
||
ConsumerNotFound ErrorCode = 10014 | ||
ConsumerNameExists ErrorCode = 10013 | ||
ConsumerAlreadyExists ErrorCode = 10105 | ||
|
||
MessageNotFound ErrorCode = 10037 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A bit worried that we are putting too many of these into the nats namespace package, wonder if it is time to move them into their own package... Or maybe suffix them like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was thinking about extracting those codes together with errors to a separate package, but moving already defined errors would be a breaking change, so I decided against it. I think the format with |
||
) | ||
|
||
// Error prints the JetStream API error code and description | ||
func (e *APIError) Error() string { | ||
return fmt.Sprintf("nats: API error %d: %s", e.ErrorCode, e.Description) | ||
} | ||
|
||
// AccountInfo retrieves info about the JetStream usage from the current account. | ||
// If JetStream is not enabled, this will return ErrJetStreamNotEnabled | ||
// Other errors can happen but are generally considered retryable | ||
|
@@ -244,13 +264,10 @@ func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) { | |
return nil, err | ||
} | ||
if info.Error != nil { | ||
var err error | ||
if strings.Contains(info.Error.Description, "not enabled for") { | ||
err = ErrJetStreamNotEnabled | ||
} else { | ||
err = errors.New(info.Error.Description) | ||
if info.Error.ErrorCode == JetStreamNotEnabledForAccount { | ||
return nil, ErrJetStreamNotEnabled | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the server not enabled for account and not enabled looks like they are two different errors actually... maybe we should start distinguishing them too and use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another factor why maybe we should clean it up is that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, distinguishing these errors is a good idea. |
||
} | ||
return nil, err | ||
return nil, info.Error | ||
} | ||
|
||
return &info.AccountInfo, nil | ||
|
@@ -268,6 +285,30 @@ type consumerResponse struct { | |
|
||
// AddConsumer will add a JetStream consumer. | ||
func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) { | ||
if cfg != nil && cfg.Durable != _EMPTY_ { | ||
consInfo, err := js.ConsumerInfo(stream, cfg.Durable) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is good to call Info before, but also changes the behavior slightly since need to give permissions to info in across account cases which they probably would have anyway. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I would lean towards leaving that, as without that |
||
if err != nil && !errors.Is(err, ErrConsumerNotFound) && !errors.Is(err, ErrStreamNotFound) { | ||
return nil, err | ||
} | ||
if consInfo != nil { | ||
return nil, fmt.Errorf("creating consumer %q on stream %q: %w", cfg.Durable, stream, ErrConsumerNameAlreadyInUse) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. think need to add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, idempotency check is a good idea. As to |
||
} | ||
} | ||
|
||
return js.upsertConsumer(stream, cfg, opts...) | ||
} | ||
|
||
func (js *js) UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) { | ||
if cfg == nil { | ||
return nil, ErrConsumerConfigRequired | ||
} | ||
if cfg.Durable == _EMPTY_ { | ||
return nil, ErrConsumerNameRequired | ||
} | ||
return js.upsertConsumer(stream, cfg, opts...) | ||
} | ||
|
||
func (js *js) upsertConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) { | ||
if err := checkStreamName(stream); err != nil { | ||
return nil, err | ||
} | ||
|
@@ -286,7 +327,7 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C | |
|
||
var ccSubj string | ||
if cfg != nil && cfg.Durable != _EMPTY_ { | ||
if err := checkDurName(cfg.Durable); err != nil { | ||
if err := checkConsumerName(cfg.Durable); err != nil { | ||
return nil, err | ||
} | ||
ccSubj = fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable) | ||
|
@@ -307,30 +348,17 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C | |
return nil, err | ||
} | ||
if info.Error != nil { | ||
if info.Error.ErrorCode == 10059 { | ||
if info.Error.ErrorCode == StreamNotFound { | ||
return nil, ErrStreamNotFound | ||
} | ||
if info.Error.Code == 404 { | ||
if info.Error.ErrorCode == ConsumerNotFound { | ||
return nil, ErrConsumerNotFound | ||
} | ||
return nil, errors.New(info.Error.Description) | ||
return nil, info.Error | ||
} | ||
return info.ConsumerInfo, nil | ||
} | ||
|
||
func (js *js) UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) { | ||
if err := checkStreamName(stream); err != nil { | ||
return nil, err | ||
} | ||
if cfg == nil { | ||
return nil, ErrConsumerConfigRequired | ||
} | ||
if cfg.Durable == _EMPTY_ { | ||
return nil, ErrInvalidDurableName | ||
} | ||
return js.AddConsumer(stream, cfg, opts...) | ||
} | ||
|
||
// consumerDeleteResponse is the response for a Consumer delete request. | ||
type consumerDeleteResponse struct { | ||
apiResponse | ||
|
@@ -347,6 +375,8 @@ func checkStreamName(stream string) error { | |
return nil | ||
} | ||
|
||
// Check that the durable name exists and is valid, that is, that it does not contain any "." | ||
// Returns ErrConsumerNameRequired if consumer name is empty, ErrInvalidConsumerName is invalid, otherwise nil | ||
func checkConsumerName(consumer string) error { | ||
if consumer == _EMPTY_ { | ||
return ErrConsumerNameRequired | ||
|
@@ -384,10 +414,10 @@ func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error { | |
} | ||
|
||
if resp.Error != nil { | ||
if resp.Error.Code == 404 { | ||
if resp.Error.ErrorCode == ConsumerNotFound { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wonder if we could make these ErrConsumerNotFound = &apiError{ErrorCode: ...} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good call. I have an idea on how to make the implementation a bit cleaner, but that would require changing the way errors are compared:
instead of
However, that means user would not be able to check errors by simple comparison ( The other approach would be to leave the client method implementations untouched and simply change the variable definitions to be What do you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, what do you mean by implementing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I put together an example of what we could do via an interface here: #1047 |
||
return ErrConsumerNotFound | ||
} | ||
return errors.New(resp.Error.Description) | ||
return resp.Error | ||
} | ||
return nil | ||
} | ||
|
@@ -474,7 +504,7 @@ func (c *consumerLister) Next() bool { | |
return false | ||
} | ||
if resp.Error != nil { | ||
c.err = errors.New(resp.Error.Description) | ||
c.err = resp.Error | ||
return false | ||
} | ||
|
||
|
@@ -571,7 +601,7 @@ func (c *consumerNamesLister) Next() bool { | |
return false | ||
} | ||
if resp.Error != nil { | ||
c.err = errors.New(resp.Error.Description) | ||
c.err = resp.Error | ||
return false | ||
} | ||
|
||
|
@@ -655,10 +685,10 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) { | |
return nil, err | ||
} | ||
if resp.Error != nil { | ||
if resp.Error.ErrorCode == 10058 { | ||
if resp.Error.ErrorCode == StreamNameInUse { | ||
return nil, ErrStreamNameAlreadyInUse | ||
} | ||
return nil, errors.New(resp.Error.Description) | ||
return nil, resp.Error | ||
} | ||
|
||
return resp.StreamInfo, nil | ||
|
@@ -703,10 +733,10 @@ func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) { | |
return nil, err | ||
} | ||
if resp.Error != nil { | ||
if resp.Error.Code == 404 { | ||
if resp.Error.ErrorCode == StreamNotFound { | ||
return nil, ErrStreamNotFound | ||
} | ||
return nil, fmt.Errorf("nats: %s", resp.Error.Description) | ||
return nil, resp.Error | ||
} | ||
|
||
return resp.StreamInfo, nil | ||
|
@@ -795,7 +825,10 @@ func (js *js) UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error | |
return nil, err | ||
} | ||
if resp.Error != nil { | ||
return nil, errors.New(resp.Error.Description) | ||
if resp.Error.ErrorCode == StreamNotFound { | ||
return nil, ErrStreamNotFound | ||
} | ||
return nil, resp.Error | ||
} | ||
return resp.StreamInfo, nil | ||
} | ||
|
@@ -830,10 +863,10 @@ func (js *js) DeleteStream(name string, opts ...JSOpt) error { | |
} | ||
|
||
if resp.Error != nil { | ||
if resp.Error.Code == 404 { | ||
if resp.Error.ErrorCode == StreamNotFound { | ||
return ErrStreamNotFound | ||
} | ||
return errors.New(resp.Error.Description) | ||
return resp.Error | ||
} | ||
return nil | ||
} | ||
|
@@ -934,10 +967,13 @@ func (js *js) getMsg(name string, mreq *apiMsgGetRequest, opts ...JSOpt) (*RawSt | |
return nil, err | ||
} | ||
if resp.Error != nil { | ||
if resp.Error.Code == 404 && strings.Contains(resp.Error.Description, "message") { | ||
if resp.Error.ErrorCode == MessageNotFound { | ||
return nil, ErrMsgNotFound | ||
} | ||
return nil, fmt.Errorf("nats: %s", resp.Error.Description) | ||
if resp.Error.ErrorCode == StreamNotFound { | ||
return nil, ErrStreamNotFound | ||
} | ||
return nil, resp.Error | ||
} | ||
|
||
msg := resp.Message | ||
|
@@ -1082,7 +1118,7 @@ func (js *js) deleteMsg(ctx context.Context, stream string, req *msgDeleteReques | |
return err | ||
} | ||
if resp.Error != nil { | ||
return errors.New(resp.Error.Description) | ||
return resp.Error | ||
} | ||
return nil | ||
} | ||
|
@@ -1148,7 +1184,7 @@ func (js *js) purgeStream(stream string, req *StreamPurgeRequest, opts ...JSOpt) | |
if resp.Error.Code == 400 { | ||
return fmt.Errorf("%w: %s", ErrBadRequest, "invalid purge request body") | ||
} | ||
return errors.New(resp.Error.Description) | ||
return resp.Error | ||
} | ||
return nil | ||
} | ||
|
@@ -1215,7 +1251,7 @@ func (s *streamLister) Next() bool { | |
return false | ||
} | ||
if resp.Error != nil { | ||
s.err = errors.New(resp.Error.Description) | ||
s.err = resp.Error | ||
return false | ||
} | ||
|
||
|
@@ -1299,7 +1335,7 @@ func (l *streamNamesLister) Next() bool { | |
return false | ||
} | ||
if resp.Error != nil { | ||
l.err = errors.New(resp.Error.Description) | ||
l.err = resp.Error | ||
return false | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this change behavior to support the ephemerals?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, pull ephemerals are already supported. This change is introduced because
nats.Durable()
should not accept empty consumer name (or elsecheckConsumerName()
will fail)