Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Add JetStream error codes, extract ErrConsumerNameAlreadyInUse #1044

Merged
merged 2 commits into from Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
40 changes: 18 additions & 22 deletions js.go
Expand Up @@ -524,7 +524,7 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
return nil, ErrInvalidJSAck
}
if pa.Error != nil {
return nil, fmt.Errorf("nats: %s", pa.Error.Description)
return nil, pa.Error
}
if pa.PubAck == nil || pa.PubAck.Stream == _EMPTY_ {
return nil, ErrInvalidJSAck
Expand Down Expand Up @@ -728,7 +728,7 @@ func (js *js) handleAsyncReply(m *Msg) {
return
}
if pa.Error != nil {
doErr(fmt.Errorf("nats: %s", pa.Error.Description))
doErr(pa.Error)
return
}
if pa.PubAck == nil || pa.PubAck.Stream == _EMPTY_ {
Expand Down Expand Up @@ -1228,7 +1228,10 @@ func (js *js) ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOp
// See important note in Subscribe()
func (js *js) PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) {
mch := make(chan *Msg, js.nc.Opts.SubChanLen)
return js.subscribe(subj, _EMPTY_, nil, mch, true, true, append(opts, Durable(durable)))
if durable != "" {
opts = append(opts, Durable(durable))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this change behavior to support the ephemerals?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, pull ephemerals are already supported. This change is introduced because nats.Durable() should not accept empty consumer name (or else checkConsumerName() will fail)

return js.subscribe(subj, _EMPTY_, nil, mch, true, true, opts)
}

func processConsInfo(info *ConsumerInfo, userCfg *ConsumerConfig, isPullMode bool, subj, queue string) (string, error) {
Expand Down Expand Up @@ -1394,7 +1397,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
// If this is a queue subscription and no consumer nor durable name was specified,
// then we will use the queue name as a durable name.
if o.consumer == _EMPTY_ && o.cfg.Durable == _EMPTY_ {
if err := checkDurName(queue); err != nil {
if err := checkConsumerName(queue); err != nil {
return nil, err
}
o.cfg.Durable = queue
Expand Down Expand Up @@ -1651,8 +1654,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
cleanUpSub()
}
if consumer != _EMPTY_ &&
(strings.Contains(cinfo.Error.Description, `consumer already exists`) ||
strings.Contains(cinfo.Error.Description, `consumer name already in use`)) {
(cinfo.Error.ErrorCode == JSErrCodeConsumerAlreadyExists || cinfo.Error.ErrorCode == JSErrCodeConsumerNameExists) {

info, err = js.ConsumerInfo(stream, consumer)
if err != nil {
Expand Down Expand Up @@ -1689,10 +1691,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
hasHeartbeats = info.Config.Heartbeat > 0
}
} else {
if cinfo.Error.Code == 404 {
if cinfo.Error.ErrorCode == JSErrCodeStreamNotFound {
return nil, ErrStreamNotFound
}
return nil, fmt.Errorf("nats: %s", cinfo.Error.Description)
return nil, cinfo.Error
}
} else {
// Since the library created the JS consumer, it will delete it on Unsubscribe()/Drain()
Expand Down Expand Up @@ -1970,7 +1972,7 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
}

if cinfo.Error != nil {
pushErr(fmt.Errorf("nats: %s", cinfo.Error.Description))
pushErr(cinfo.Error)
return
}

Expand Down Expand Up @@ -2185,17 +2187,8 @@ func Description(description string) SubOpt {
})
}

// Check that the durable name is valid, that is, that it does not contain
// any ".", and if it does return ErrInvalidDurableName, otherwise nil.
func checkDurName(dur string) error {
if strings.Contains(dur, ".") {
return ErrInvalidDurableName
}
return nil
}

// Durable defines the consumer name for JetStream durable subscribers.
// This function will return ErrInvalidDurableName in the name contains
// This function will return ErrInvalidConsumerName in the name contains
// any dot ".".
func Durable(consumer string) SubOpt {
return subOptFn(func(opts *subOpts) error {
Expand All @@ -2205,7 +2198,7 @@ func Durable(consumer string) SubOpt {
if opts.consumer != _EMPTY_ && opts.consumer != consumer {
return fmt.Errorf("nats: duplicate consumer names (%s and %s)", opts.consumer, consumer)
}
if err := checkDurName(consumer); err != nil {
if err := checkConsumerName(consumer); err != nil {
return err
}

Expand Down Expand Up @@ -2757,10 +2750,13 @@ func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer strin
return nil, err
}
if info.Error != nil {
if info.Error.Code == 404 {
if info.Error.ErrorCode == JSErrCodeConsumerNotFound {
return nil, ErrConsumerNotFound
}
return nil, fmt.Errorf("nats: %s", info.Error.Description)
if info.Error.ErrorCode == JSErrCodeStreamNotFound {
return nil, ErrStreamNotFound
}
return nil, info.Error
}
return info.ConsumerInfo, nil
}
Expand Down
130 changes: 87 additions & 43 deletions jsm.go
Expand Up @@ -156,9 +156,9 @@ type ExternalStream struct {

// APIError is included in all API responses if there was an error.
type APIError struct {
Code int `json:"code"`
ErrorCode int `json:"err_code"`
Description string `json:"description,omitempty"`
Code int `json:"code"`
ErrorCode ErrorCode `json:"err_code"`
Description string `json:"description,omitempty"`
}

// apiResponse is a standard response from the JetStream JSON API
Expand Down Expand Up @@ -219,6 +219,27 @@ type accountInfoResponse struct {
AccountInfo
}

type ErrorCode uint16

const (
JSErrCodeJetStreamNotEnabledForAccount ErrorCode = 10039
JSErrCodeJetStreamNotEnabled ErrorCode = 10076

JSErrCodeStreamNotFound ErrorCode = 10059
JSErrCodeStreamNameInUse ErrorCode = 10058

JSErrCodeConsumerNotFound ErrorCode = 10014
JSErrCodeConsumerNameExists ErrorCode = 10013
JSErrCodeConsumerAlreadyExists ErrorCode = 10105

JSErrCodeMessageNotFound ErrorCode = 10037
)

// Error prints the JetStream API error code and description
func (e *APIError) Error() string {
return fmt.Sprintf("nats: API error %d: %s", e.ErrorCode, e.Description)
}

// AccountInfo retrieves info about the JetStream usage from the current account.
// If JetStream is not enabled, this will return ErrJetStreamNotEnabled
// Other errors can happen but are generally considered retryable
Expand All @@ -244,13 +265,13 @@ func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) {
return nil, err
}
if info.Error != nil {
var err error
if strings.Contains(info.Error.Description, "not enabled for") {
err = ErrJetStreamNotEnabled
} else {
err = errors.New(info.Error.Description)
if info.Error.ErrorCode == JSErrCodeJetStreamNotEnabledForAccount {
return nil, ErrJetStreamNotEnabledForAccount
}
return nil, err
if info.Error.ErrorCode == JSErrCodeJetStreamNotEnabled {
return nil, ErrJetStreamNotEnabled
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the server not enabled for account and not enabled looks like they are two different errors actually... maybe we should start distinguishing them too and use ErrJetStreamNotEnabledForAccount?

https://github.com/nats-io/nats-server/blob/52c4872666a26acdf475cb89ae9b5f367324fadd/server/jetstream_errors_generated.go#L472-L474

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another factor why maybe we should clean it up is that JetStreamNotEnabledForAccount is a hard error since means that account can never use JetStream in current setup, whereas things are a bit less certain in ErrJetStreamNotEnabled, it can be either a temporary error due to JS not ready yet.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, distinguishing these errors is a good idea.

}
return nil, info.Error
}

return &info.AccountInfo, nil
Expand All @@ -268,6 +289,34 @@ type consumerResponse struct {

// AddConsumer will add a JetStream consumer.
func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
if cfg != nil && cfg.Durable != _EMPTY_ {
consInfo, err := js.ConsumerInfo(stream, cfg.Durable)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is good to call Info before, but also changes the behavior slightly since need to give permissions to info in across account cases which they probably would have anyway.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I would lean towards leaving that, as without that AddConsumer() I feel does not work as the name suggests (it will simply update a consumer if it already exists).

if err != nil && !errors.Is(err, ErrConsumerNotFound) && !errors.Is(err, ErrStreamNotFound) {
return nil, err
}

if consInfo != nil {
sameConfig := checkConfig(&consInfo.Config, cfg)
if sameConfig != nil {
return nil, fmt.Errorf("%w: creating consumer %q on stream %q", ErrConsumerNameAlreadyInUse, cfg.Durable, stream)
}
}
}

return js.upsertConsumer(stream, cfg, opts...)
}

func (js *js) UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
if cfg == nil {
return nil, ErrConsumerConfigRequired
}
if cfg.Durable == _EMPTY_ {
return nil, ErrConsumerNameRequired
}
return js.upsertConsumer(stream, cfg, opts...)
}

func (js *js) upsertConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
if err := checkStreamName(stream); err != nil {
return nil, err
}
Expand All @@ -286,7 +335,7 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C

var ccSubj string
if cfg != nil && cfg.Durable != _EMPTY_ {
if err := checkDurName(cfg.Durable); err != nil {
if err := checkConsumerName(cfg.Durable); err != nil {
return nil, err
}
ccSubj = fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable)
Expand All @@ -307,30 +356,17 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C
return nil, err
}
if info.Error != nil {
if info.Error.ErrorCode == 10059 {
if info.Error.ErrorCode == JSErrCodeStreamNotFound {
return nil, ErrStreamNotFound
}
if info.Error.Code == 404 {
if info.Error.ErrorCode == JSErrCodeConsumerNotFound {
return nil, ErrConsumerNotFound
}
return nil, errors.New(info.Error.Description)
return nil, info.Error
}
return info.ConsumerInfo, nil
}

func (js *js) UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
if err := checkStreamName(stream); err != nil {
return nil, err
}
if cfg == nil {
return nil, ErrConsumerConfigRequired
}
if cfg.Durable == _EMPTY_ {
return nil, ErrInvalidDurableName
}
return js.AddConsumer(stream, cfg, opts...)
}

// consumerDeleteResponse is the response for a Consumer delete request.
type consumerDeleteResponse struct {
apiResponse
Expand All @@ -347,6 +383,8 @@ func checkStreamName(stream string) error {
return nil
}

// Check that the durable name exists and is valid, that is, that it does not contain any "."
// Returns ErrConsumerNameRequired if consumer name is empty, ErrInvalidConsumerName is invalid, otherwise nil
func checkConsumerName(consumer string) error {
if consumer == _EMPTY_ {
return ErrConsumerNameRequired
Expand Down Expand Up @@ -384,10 +422,10 @@ func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error {
}

if resp.Error != nil {
if resp.Error.Code == 404 {
if resp.Error.ErrorCode == JSErrCodeConsumerNotFound {
return ErrConsumerNotFound
}
return errors.New(resp.Error.Description)
return resp.Error
}
return nil
}
Expand Down Expand Up @@ -474,7 +512,7 @@ func (c *consumerLister) Next() bool {
return false
}
if resp.Error != nil {
c.err = errors.New(resp.Error.Description)
c.err = resp.Error
return false
}

Expand Down Expand Up @@ -571,7 +609,7 @@ func (c *consumerNamesLister) Next() bool {
return false
}
if resp.Error != nil {
c.err = errors.New(resp.Error.Description)
c.err = resp.Error
return false
}

Expand Down Expand Up @@ -655,10 +693,10 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
return nil, err
}
if resp.Error != nil {
if resp.Error.ErrorCode == 10058 {
if resp.Error.ErrorCode == JSErrCodeStreamNameInUse {
return nil, ErrStreamNameAlreadyInUse
}
return nil, errors.New(resp.Error.Description)
return nil, resp.Error
}

return resp.StreamInfo, nil
Expand Down Expand Up @@ -703,10 +741,10 @@ func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) {
return nil, err
}
if resp.Error != nil {
if resp.Error.Code == 404 {
if resp.Error.ErrorCode == JSErrCodeStreamNotFound {
return nil, ErrStreamNotFound
}
return nil, fmt.Errorf("nats: %s", resp.Error.Description)
return nil, resp.Error
}

return resp.StreamInfo, nil
Expand Down Expand Up @@ -795,7 +833,10 @@ func (js *js) UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error
return nil, err
}
if resp.Error != nil {
return nil, errors.New(resp.Error.Description)
if resp.Error.ErrorCode == JSErrCodeStreamNotFound {
return nil, ErrStreamNotFound
}
return nil, resp.Error
}
return resp.StreamInfo, nil
}
Expand Down Expand Up @@ -830,10 +871,10 @@ func (js *js) DeleteStream(name string, opts ...JSOpt) error {
}

if resp.Error != nil {
if resp.Error.Code == 404 {
if resp.Error.ErrorCode == JSErrCodeStreamNotFound {
return ErrStreamNotFound
}
return errors.New(resp.Error.Description)
return resp.Error
}
return nil
}
Expand Down Expand Up @@ -934,10 +975,13 @@ func (js *js) getMsg(name string, mreq *apiMsgGetRequest, opts ...JSOpt) (*RawSt
return nil, err
}
if resp.Error != nil {
if resp.Error.Code == 404 && strings.Contains(resp.Error.Description, "message") {
if resp.Error.ErrorCode == JSErrCodeMessageNotFound {
return nil, ErrMsgNotFound
}
return nil, fmt.Errorf("nats: %s", resp.Error.Description)
if resp.Error.ErrorCode == JSErrCodeStreamNotFound {
return nil, ErrStreamNotFound
}
return nil, resp.Error
}

msg := resp.Message
Expand Down Expand Up @@ -1082,7 +1126,7 @@ func (js *js) deleteMsg(ctx context.Context, stream string, req *msgDeleteReques
return err
}
if resp.Error != nil {
return errors.New(resp.Error.Description)
return resp.Error
}
return nil
}
Expand Down Expand Up @@ -1148,7 +1192,7 @@ func (js *js) purgeStream(stream string, req *StreamPurgeRequest, opts ...JSOpt)
if resp.Error.Code == 400 {
return fmt.Errorf("%w: %s", ErrBadRequest, "invalid purge request body")
}
return errors.New(resp.Error.Description)
return resp.Error
}
return nil
}
Expand Down Expand Up @@ -1215,7 +1259,7 @@ func (s *streamLister) Next() bool {
return false
}
if resp.Error != nil {
s.err = errors.New(resp.Error.Description)
s.err = resp.Error
return false
}

Expand Down Expand Up @@ -1299,7 +1343,7 @@ func (l *streamNamesLister) Next() bool {
return false
}
if resp.Error != nil {
l.err = errors.New(resp.Error.Description)
l.err = resp.Error
return false
}

Expand Down