Skip to content

Commit

Permalink
Merge pull request #1044 from nats-io/js-error-codes
Browse files Browse the repository at this point in the history
[IMPROVED] Add JetStream error codes, extract ErrConsumerNameAlreadyInUse
  • Loading branch information
kozlovic committed Aug 16, 2022
2 parents ea2caaa + 82272a0 commit b81c9e7
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 167 deletions.
40 changes: 18 additions & 22 deletions js.go
Expand Up @@ -524,7 +524,7 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
return nil, ErrInvalidJSAck
}
if pa.Error != nil {
return nil, fmt.Errorf("nats: %s", pa.Error.Description)
return nil, pa.Error
}
if pa.PubAck == nil || pa.PubAck.Stream == _EMPTY_ {
return nil, ErrInvalidJSAck
Expand Down Expand Up @@ -728,7 +728,7 @@ func (js *js) handleAsyncReply(m *Msg) {
return
}
if pa.Error != nil {
doErr(fmt.Errorf("nats: %s", pa.Error.Description))
doErr(pa.Error)
return
}
if pa.PubAck == nil || pa.PubAck.Stream == _EMPTY_ {
Expand Down Expand Up @@ -1230,7 +1230,10 @@ func (js *js) ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOp
// See important note in Subscribe()
func (js *js) PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) {
mch := make(chan *Msg, js.nc.Opts.SubChanLen)
return js.subscribe(subj, _EMPTY_, nil, mch, true, true, append(opts, Durable(durable)))
if durable != "" {
opts = append(opts, Durable(durable))
}
return js.subscribe(subj, _EMPTY_, nil, mch, true, true, opts)
}

func processConsInfo(info *ConsumerInfo, userCfg *ConsumerConfig, isPullMode bool, subj, queue string) (string, error) {
Expand Down Expand Up @@ -1396,7 +1399,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
// If this is a queue subscription and no consumer nor durable name was specified,
// then we will use the queue name as a durable name.
if o.consumer == _EMPTY_ && o.cfg.Durable == _EMPTY_ {
if err := checkDurName(queue); err != nil {
if err := checkConsumerName(queue); err != nil {
return nil, err
}
o.cfg.Durable = queue
Expand Down Expand Up @@ -1653,8 +1656,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
cleanUpSub()
}
if consumer != _EMPTY_ &&
(strings.Contains(cinfo.Error.Description, `consumer already exists`) ||
strings.Contains(cinfo.Error.Description, `consumer name already in use`)) {
(cinfo.Error.ErrorCode == JSErrCodeConsumerAlreadyExists || cinfo.Error.ErrorCode == JSErrCodeConsumerNameExists) {

info, err = js.ConsumerInfo(stream, consumer)
if err != nil {
Expand Down Expand Up @@ -1691,10 +1693,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
hasHeartbeats = info.Config.Heartbeat > 0
}
} else {
if cinfo.Error.Code == 404 {
if cinfo.Error.ErrorCode == JSErrCodeStreamNotFound {
return nil, ErrStreamNotFound
}
return nil, fmt.Errorf("nats: %s", cinfo.Error.Description)
return nil, cinfo.Error
}
} else {
// Since the library created the JS consumer, it will delete it on Unsubscribe()/Drain()
Expand Down Expand Up @@ -1972,7 +1974,7 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
}

if cinfo.Error != nil {
pushErr(fmt.Errorf("nats: %s", cinfo.Error.Description))
pushErr(cinfo.Error)
return
}

Expand Down Expand Up @@ -2187,17 +2189,8 @@ func Description(description string) SubOpt {
})
}

// Check that the durable name is valid, that is, that it does not contain
// any ".", and if it does return ErrInvalidDurableName, otherwise nil.
func checkDurName(dur string) error {
if strings.Contains(dur, ".") {
return ErrInvalidDurableName
}
return nil
}

// Durable defines the consumer name for JetStream durable subscribers.
// This function will return ErrInvalidDurableName in the name contains
// This function will return ErrInvalidConsumerName in the name contains
// any dot ".".
func Durable(consumer string) SubOpt {
return subOptFn(func(opts *subOpts) error {
Expand All @@ -2207,7 +2200,7 @@ func Durable(consumer string) SubOpt {
if opts.consumer != _EMPTY_ && opts.consumer != consumer {
return fmt.Errorf("nats: duplicate consumer names (%s and %s)", opts.consumer, consumer)
}
if err := checkDurName(consumer); err != nil {
if err := checkConsumerName(consumer); err != nil {
return err
}

Expand Down Expand Up @@ -2779,10 +2772,13 @@ func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer strin
return nil, err
}
if info.Error != nil {
if info.Error.Code == 404 {
if info.Error.ErrorCode == JSErrCodeConsumerNotFound {
return nil, ErrConsumerNotFound
}
return nil, fmt.Errorf("nats: %s", info.Error.Description)
if info.Error.ErrorCode == JSErrCodeStreamNotFound {
return nil, ErrStreamNotFound
}
return nil, info.Error
}
return info.ConsumerInfo, nil
}
Expand Down
130 changes: 87 additions & 43 deletions jsm.go
Expand Up @@ -156,9 +156,9 @@ type ExternalStream struct {

// APIError is included in all API responses if there was an error.
type APIError struct {
Code int `json:"code"`
ErrorCode int `json:"err_code"`
Description string `json:"description,omitempty"`
Code int `json:"code"`
ErrorCode ErrorCode `json:"err_code"`
Description string `json:"description,omitempty"`
}

// apiResponse is a standard response from the JetStream JSON API
Expand Down Expand Up @@ -219,6 +219,27 @@ type accountInfoResponse struct {
AccountInfo
}

type ErrorCode uint16

const (
JSErrCodeJetStreamNotEnabledForAccount ErrorCode = 10039
JSErrCodeJetStreamNotEnabled ErrorCode = 10076

JSErrCodeStreamNotFound ErrorCode = 10059
JSErrCodeStreamNameInUse ErrorCode = 10058

JSErrCodeConsumerNotFound ErrorCode = 10014
JSErrCodeConsumerNameExists ErrorCode = 10013
JSErrCodeConsumerAlreadyExists ErrorCode = 10105

JSErrCodeMessageNotFound ErrorCode = 10037
)

// Error prints the JetStream API error code and description
func (e *APIError) Error() string {
return fmt.Sprintf("nats: API error %d: %s", e.ErrorCode, e.Description)
}

// AccountInfo retrieves info about the JetStream usage from the current account.
// If JetStream is not enabled, this will return ErrJetStreamNotEnabled
// Other errors can happen but are generally considered retryable
Expand All @@ -244,13 +265,13 @@ func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) {
return nil, err
}
if info.Error != nil {
var err error
if strings.Contains(info.Error.Description, "not enabled for") {
err = ErrJetStreamNotEnabled
} else {
err = errors.New(info.Error.Description)
if info.Error.ErrorCode == JSErrCodeJetStreamNotEnabledForAccount {
return nil, ErrJetStreamNotEnabledForAccount
}
return nil, err
if info.Error.ErrorCode == JSErrCodeJetStreamNotEnabled {
return nil, ErrJetStreamNotEnabled
}
return nil, info.Error
}

return &info.AccountInfo, nil
Expand All @@ -268,6 +289,34 @@ type consumerResponse struct {

// AddConsumer will add a JetStream consumer.
func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
if cfg != nil && cfg.Durable != _EMPTY_ {
consInfo, err := js.ConsumerInfo(stream, cfg.Durable)
if err != nil && !errors.Is(err, ErrConsumerNotFound) && !errors.Is(err, ErrStreamNotFound) {
return nil, err
}

if consInfo != nil {
sameConfig := checkConfig(&consInfo.Config, cfg)
if sameConfig != nil {
return nil, fmt.Errorf("%w: creating consumer %q on stream %q", ErrConsumerNameAlreadyInUse, cfg.Durable, stream)
}
}
}

return js.upsertConsumer(stream, cfg, opts...)
}

func (js *js) UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
if cfg == nil {
return nil, ErrConsumerConfigRequired
}
if cfg.Durable == _EMPTY_ {
return nil, ErrConsumerNameRequired
}
return js.upsertConsumer(stream, cfg, opts...)
}

func (js *js) upsertConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
if err := checkStreamName(stream); err != nil {
return nil, err
}
Expand All @@ -286,7 +335,7 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C

var ccSubj string
if cfg != nil && cfg.Durable != _EMPTY_ {
if err := checkDurName(cfg.Durable); err != nil {
if err := checkConsumerName(cfg.Durable); err != nil {
return nil, err
}
ccSubj = fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable)
Expand All @@ -307,30 +356,17 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C
return nil, err
}
if info.Error != nil {
if info.Error.ErrorCode == 10059 {
if info.Error.ErrorCode == JSErrCodeStreamNotFound {
return nil, ErrStreamNotFound
}
if info.Error.Code == 404 {
if info.Error.ErrorCode == JSErrCodeConsumerNotFound {
return nil, ErrConsumerNotFound
}
return nil, errors.New(info.Error.Description)
return nil, info.Error
}
return info.ConsumerInfo, nil
}

func (js *js) UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
if err := checkStreamName(stream); err != nil {
return nil, err
}
if cfg == nil {
return nil, ErrConsumerConfigRequired
}
if cfg.Durable == _EMPTY_ {
return nil, ErrInvalidDurableName
}
return js.AddConsumer(stream, cfg, opts...)
}

// consumerDeleteResponse is the response for a Consumer delete request.
type consumerDeleteResponse struct {
apiResponse
Expand All @@ -347,6 +383,8 @@ func checkStreamName(stream string) error {
return nil
}

// Check that the durable name exists and is valid, that is, that it does not contain any "."
// Returns ErrConsumerNameRequired if consumer name is empty, ErrInvalidConsumerName is invalid, otherwise nil
func checkConsumerName(consumer string) error {
if consumer == _EMPTY_ {
return ErrConsumerNameRequired
Expand Down Expand Up @@ -384,10 +422,10 @@ func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error {
}

if resp.Error != nil {
if resp.Error.Code == 404 {
if resp.Error.ErrorCode == JSErrCodeConsumerNotFound {
return ErrConsumerNotFound
}
return errors.New(resp.Error.Description)
return resp.Error
}
return nil
}
Expand Down Expand Up @@ -474,7 +512,7 @@ func (c *consumerLister) Next() bool {
return false
}
if resp.Error != nil {
c.err = errors.New(resp.Error.Description)
c.err = resp.Error
return false
}

Expand Down Expand Up @@ -571,7 +609,7 @@ func (c *consumerNamesLister) Next() bool {
return false
}
if resp.Error != nil {
c.err = errors.New(resp.Error.Description)
c.err = resp.Error
return false
}

Expand Down Expand Up @@ -655,10 +693,10 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
return nil, err
}
if resp.Error != nil {
if resp.Error.ErrorCode == 10058 {
if resp.Error.ErrorCode == JSErrCodeStreamNameInUse {
return nil, ErrStreamNameAlreadyInUse
}
return nil, errors.New(resp.Error.Description)
return nil, resp.Error
}

return resp.StreamInfo, nil
Expand Down Expand Up @@ -703,10 +741,10 @@ func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) {
return nil, err
}
if resp.Error != nil {
if resp.Error.Code == 404 {
if resp.Error.ErrorCode == JSErrCodeStreamNotFound {
return nil, ErrStreamNotFound
}
return nil, fmt.Errorf("nats: %s", resp.Error.Description)
return nil, resp.Error
}

return resp.StreamInfo, nil
Expand Down Expand Up @@ -795,7 +833,10 @@ func (js *js) UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error
return nil, err
}
if resp.Error != nil {
return nil, errors.New(resp.Error.Description)
if resp.Error.ErrorCode == JSErrCodeStreamNotFound {
return nil, ErrStreamNotFound
}
return nil, resp.Error
}
return resp.StreamInfo, nil
}
Expand Down Expand Up @@ -830,10 +871,10 @@ func (js *js) DeleteStream(name string, opts ...JSOpt) error {
}

if resp.Error != nil {
if resp.Error.Code == 404 {
if resp.Error.ErrorCode == JSErrCodeStreamNotFound {
return ErrStreamNotFound
}
return errors.New(resp.Error.Description)
return resp.Error
}
return nil
}
Expand Down Expand Up @@ -934,10 +975,13 @@ func (js *js) getMsg(name string, mreq *apiMsgGetRequest, opts ...JSOpt) (*RawSt
return nil, err
}
if resp.Error != nil {
if resp.Error.Code == 404 && strings.Contains(resp.Error.Description, "message") {
if resp.Error.ErrorCode == JSErrCodeMessageNotFound {
return nil, ErrMsgNotFound
}
return nil, fmt.Errorf("nats: %s", resp.Error.Description)
if resp.Error.ErrorCode == JSErrCodeStreamNotFound {
return nil, ErrStreamNotFound
}
return nil, resp.Error
}

msg := resp.Message
Expand Down Expand Up @@ -1082,7 +1126,7 @@ func (js *js) deleteMsg(ctx context.Context, stream string, req *msgDeleteReques
return err
}
if resp.Error != nil {
return errors.New(resp.Error.Description)
return resp.Error
}
return nil
}
Expand Down Expand Up @@ -1148,7 +1192,7 @@ func (js *js) purgeStream(stream string, req *StreamPurgeRequest, opts ...JSOpt)
if resp.Error.Code == 400 {
return fmt.Errorf("%w: %s", ErrBadRequest, "invalid purge request body")
}
return errors.New(resp.Error.Description)
return resp.Error
}
return nil
}
Expand Down Expand Up @@ -1215,7 +1259,7 @@ func (s *streamLister) Next() bool {
return false
}
if resp.Error != nil {
s.err = errors.New(resp.Error.Description)
s.err = resp.Error
return false
}

Expand Down Expand Up @@ -1299,7 +1343,7 @@ func (l *streamNamesLister) Next() bool {
return false
}
if resp.Error != nil {
l.err = errors.New(resp.Error.Description)
l.err = resp.Error
return false
}

Expand Down

0 comments on commit b81c9e7

Please sign in to comment.