Skip to content

Commit

Permalink
Use JetStream error codes, extract ErrConsumerNameAlredyInUse
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrpio committed Aug 11, 2022
1 parent fb5ca2c commit 4d7a668
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 87 deletions.
40 changes: 18 additions & 22 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
return nil, ErrInvalidJSAck
}
if pa.Error != nil {
return nil, fmt.Errorf("nats: %s", pa.Error.Description)
return nil, pa.Error
}
if pa.PubAck == nil || pa.PubAck.Stream == _EMPTY_ {
return nil, ErrInvalidJSAck
Expand Down Expand Up @@ -728,7 +728,7 @@ func (js *js) handleAsyncReply(m *Msg) {
return
}
if pa.Error != nil {
doErr(fmt.Errorf("nats: %s", pa.Error.Description))
doErr(pa.Error)
return
}
if pa.PubAck == nil || pa.PubAck.Stream == _EMPTY_ {
Expand Down Expand Up @@ -1228,7 +1228,10 @@ func (js *js) ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOp
// See important note in Subscribe()
func (js *js) PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) {
mch := make(chan *Msg, js.nc.Opts.SubChanLen)
return js.subscribe(subj, _EMPTY_, nil, mch, true, true, append(opts, Durable(durable)))
if durable != "" {
opts = append(opts, Durable(durable))
}
return js.subscribe(subj, _EMPTY_, nil, mch, true, true, opts)
}

func processConsInfo(info *ConsumerInfo, userCfg *ConsumerConfig, isPullMode bool, subj, queue string) (string, error) {
Expand Down Expand Up @@ -1394,7 +1397,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
// If this is a queue subscription and no consumer nor durable name was specified,
// then we will use the queue name as a durable name.
if o.consumer == _EMPTY_ && o.cfg.Durable == _EMPTY_ {
if err := checkDurName(queue); err != nil {
if err := checkConsumerName(queue); err != nil {
return nil, err
}
o.cfg.Durable = queue
Expand Down Expand Up @@ -1651,8 +1654,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
cleanUpSub()
}
if consumer != _EMPTY_ &&
(strings.Contains(cinfo.Error.Description, `consumer already exists`) ||
strings.Contains(cinfo.Error.Description, `consumer name already in use`)) {
(cinfo.Error.ErrorCode == ConsumerAlreadyExists || cinfo.Error.ErrorCode == ConsumerNameExists) {

info, err = js.ConsumerInfo(stream, consumer)
if err != nil {
Expand Down Expand Up @@ -1689,10 +1691,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
hasHeartbeats = info.Config.Heartbeat > 0
}
} else {
if cinfo.Error.Code == 404 {
if cinfo.Error.ErrorCode == StreamNotFound {
return nil, ErrStreamNotFound
}
return nil, fmt.Errorf("nats: %s", cinfo.Error.Description)
return nil, cinfo.Error
}
} else {
// Since the library created the JS consumer, it will delete it on Unsubscribe()/Drain()
Expand Down Expand Up @@ -1970,7 +1972,7 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
}

if cinfo.Error != nil {
pushErr(fmt.Errorf("nats: %s", cinfo.Error.Description))
pushErr(cinfo.Error)
return
}

Expand Down Expand Up @@ -2185,17 +2187,8 @@ func Description(description string) SubOpt {
})
}

// Check that the durable name is valid, that is, that it does not contain
// any ".", and if it does return ErrInvalidDurableName, otherwise nil.
func checkDurName(dur string) error {
if strings.Contains(dur, ".") {
return ErrInvalidDurableName
}
return nil
}

// Durable defines the consumer name for JetStream durable subscribers.
// This function will return ErrInvalidDurableName in the name contains
// This function will return ErrInvalidConsumerName in the name contains
// any dot ".".
func Durable(consumer string) SubOpt {
return subOptFn(func(opts *subOpts) error {
Expand All @@ -2205,7 +2198,7 @@ func Durable(consumer string) SubOpt {
if opts.consumer != _EMPTY_ && opts.consumer != consumer {
return fmt.Errorf("nats: duplicate consumer names (%s and %s)", opts.consumer, consumer)
}
if err := checkDurName(consumer); err != nil {
if err := checkConsumerName(consumer); err != nil {
return err
}

Expand Down Expand Up @@ -2757,10 +2750,13 @@ func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer strin
return nil, err
}
if info.Error != nil {
if info.Error.Code == 404 {
if info.Error.ErrorCode == ConsumerNotFound {
return nil, ErrConsumerNotFound
}
return nil, fmt.Errorf("nats: %s", info.Error.Description)
if info.Error.ErrorCode == StreamNotFound {
return nil, ErrStreamNotFound
}
return nil, info.Error
}
return info.ConsumerInfo, nil
}
Expand Down
122 changes: 79 additions & 43 deletions jsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,9 @@ type ExternalStream struct {

// APIError is included in all API responses if there was an error.
type APIError struct {
Code int `json:"code"`
ErrorCode int `json:"err_code"`
Description string `json:"description,omitempty"`
Code int `json:"code"`
ErrorCode ErrorCode `json:"err_code"`
Description string `json:"description,omitempty"`
}

// apiResponse is a standard response from the JetStream JSON API
Expand Down Expand Up @@ -219,6 +219,26 @@ type accountInfoResponse struct {
AccountInfo
}

type ErrorCode uint16

const (
JetStreamNotEnabledForAccount ErrorCode = 10039

StreamNotFound ErrorCode = 10059
StreamNameInUse ErrorCode = 10058

ConsumerNotFound ErrorCode = 10014
ConsumerNameExists ErrorCode = 10013
ConsumerAlreadyExists ErrorCode = 10105

MessageNotFound ErrorCode = 10037
)

// Error prints the JetStream API error code and description
func (e *APIError) Error() string {
return fmt.Sprintf("nats: API error %d: %s", e.ErrorCode, e.Description)
}

// AccountInfo retrieves info about the JetStream usage from the current account.
// If JetStream is not enabled, this will return ErrJetStreamNotEnabled
// Other errors can happen but are generally considered retryable
Expand All @@ -244,13 +264,10 @@ func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) {
return nil, err
}
if info.Error != nil {
var err error
if strings.Contains(info.Error.Description, "not enabled for") {
err = ErrJetStreamNotEnabled
} else {
err = errors.New(info.Error.Description)
if info.Error.ErrorCode == JetStreamNotEnabledForAccount {
return nil, ErrJetStreamNotEnabled
}
return nil, err
return nil, info.Error
}

return &info.AccountInfo, nil
Expand All @@ -268,6 +285,30 @@ type consumerResponse struct {

// AddConsumer will add a JetStream consumer.
func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
if cfg != nil && cfg.Durable != _EMPTY_ {
consInfo, err := js.ConsumerInfo(stream, cfg.Durable)
if err != nil && !errors.Is(err, ErrConsumerNotFound) && !errors.Is(err, ErrStreamNotFound) {
return nil, err
}
if consInfo != nil {
return nil, fmt.Errorf("creating consumer %q on stream %q: %w", cfg.Durable, stream, ErrConsumerNameAlreadyInUse)
}
}

return js.upsertConsumer(stream, cfg, opts...)
}

func (js *js) UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
if cfg == nil {
return nil, ErrConsumerConfigRequired
}
if cfg.Durable == _EMPTY_ {
return nil, ErrConsumerNameRequired
}
return js.upsertConsumer(stream, cfg, opts...)
}

func (js *js) upsertConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
if err := checkStreamName(stream); err != nil {
return nil, err
}
Expand All @@ -286,7 +327,7 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C

var ccSubj string
if cfg != nil && cfg.Durable != _EMPTY_ {
if err := checkDurName(cfg.Durable); err != nil {
if err := checkConsumerName(cfg.Durable); err != nil {
return nil, err
}
ccSubj = fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable)
Expand All @@ -307,30 +348,17 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C
return nil, err
}
if info.Error != nil {
if info.Error.ErrorCode == 10059 {
if info.Error.ErrorCode == StreamNotFound {
return nil, ErrStreamNotFound
}
if info.Error.Code == 404 {
if info.Error.ErrorCode == ConsumerNotFound {
return nil, ErrConsumerNotFound
}
return nil, errors.New(info.Error.Description)
return nil, info.Error
}
return info.ConsumerInfo, nil
}

func (js *js) UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
if err := checkStreamName(stream); err != nil {
return nil, err
}
if cfg == nil {
return nil, ErrConsumerConfigRequired
}
if cfg.Durable == _EMPTY_ {
return nil, ErrInvalidDurableName
}
return js.AddConsumer(stream, cfg, opts...)
}

// consumerDeleteResponse is the response for a Consumer delete request.
type consumerDeleteResponse struct {
apiResponse
Expand All @@ -347,6 +375,8 @@ func checkStreamName(stream string) error {
return nil
}

// Check that the durable name exists and is valid, that is, that it does not contain any "."
// Returns ErrConsumerNameRequired if consumer name is empty, ErrInvalidConsumerName is invalid, otherwise nil
func checkConsumerName(consumer string) error {
if consumer == _EMPTY_ {
return ErrConsumerNameRequired
Expand Down Expand Up @@ -384,10 +414,10 @@ func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error {
}

if resp.Error != nil {
if resp.Error.Code == 404 {
if resp.Error.ErrorCode == ConsumerNotFound {
return ErrConsumerNotFound
}
return errors.New(resp.Error.Description)
return resp.Error
}
return nil
}
Expand Down Expand Up @@ -474,7 +504,7 @@ func (c *consumerLister) Next() bool {
return false
}
if resp.Error != nil {
c.err = errors.New(resp.Error.Description)
c.err = resp.Error
return false
}

Expand Down Expand Up @@ -571,7 +601,7 @@ func (c *consumerNamesLister) Next() bool {
return false
}
if resp.Error != nil {
c.err = errors.New(resp.Error.Description)
c.err = resp.Error
return false
}

Expand Down Expand Up @@ -655,10 +685,10 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
return nil, err
}
if resp.Error != nil {
if resp.Error.ErrorCode == 10058 {
if resp.Error.ErrorCode == StreamNameInUse {
return nil, ErrStreamNameAlreadyInUse
}
return nil, errors.New(resp.Error.Description)
return nil, resp.Error
}

return resp.StreamInfo, nil
Expand Down Expand Up @@ -703,10 +733,10 @@ func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) {
return nil, err
}
if resp.Error != nil {
if resp.Error.Code == 404 {
if resp.Error.ErrorCode == StreamNotFound {
return nil, ErrStreamNotFound
}
return nil, fmt.Errorf("nats: %s", resp.Error.Description)
return nil, resp.Error
}

return resp.StreamInfo, nil
Expand Down Expand Up @@ -795,7 +825,10 @@ func (js *js) UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error
return nil, err
}
if resp.Error != nil {
return nil, errors.New(resp.Error.Description)
if resp.Error.ErrorCode == StreamNotFound {
return nil, ErrStreamNotFound
}
return nil, resp.Error
}
return resp.StreamInfo, nil
}
Expand Down Expand Up @@ -830,10 +863,10 @@ func (js *js) DeleteStream(name string, opts ...JSOpt) error {
}

if resp.Error != nil {
if resp.Error.Code == 404 {
if resp.Error.ErrorCode == StreamNotFound {
return ErrStreamNotFound
}
return errors.New(resp.Error.Description)
return resp.Error
}
return nil
}
Expand Down Expand Up @@ -934,10 +967,13 @@ func (js *js) getMsg(name string, mreq *apiMsgGetRequest, opts ...JSOpt) (*RawSt
return nil, err
}
if resp.Error != nil {
if resp.Error.Code == 404 && strings.Contains(resp.Error.Description, "message") {
if resp.Error.ErrorCode == MessageNotFound {
return nil, ErrMsgNotFound
}
return nil, fmt.Errorf("nats: %s", resp.Error.Description)
if resp.Error.ErrorCode == StreamNotFound {
return nil, ErrStreamNotFound
}
return nil, resp.Error
}

msg := resp.Message
Expand Down Expand Up @@ -1082,7 +1118,7 @@ func (js *js) deleteMsg(ctx context.Context, stream string, req *msgDeleteReques
return err
}
if resp.Error != nil {
return errors.New(resp.Error.Description)
return resp.Error
}
return nil
}
Expand Down Expand Up @@ -1148,7 +1184,7 @@ func (js *js) purgeStream(stream string, req *StreamPurgeRequest, opts ...JSOpt)
if resp.Error.Code == 400 {
return fmt.Errorf("%w: %s", ErrBadRequest, "invalid purge request body")
}
return errors.New(resp.Error.Description)
return resp.Error
}
return nil
}
Expand Down Expand Up @@ -1215,7 +1251,7 @@ func (s *streamLister) Next() bool {
return false
}
if resp.Error != nil {
s.err = errors.New(resp.Error.Description)
s.err = resp.Error
return false
}

Expand Down Expand Up @@ -1299,7 +1335,7 @@ func (l *streamNamesLister) Next() bool {
return false
}
if resp.Error != nil {
l.err = errors.New(resp.Error.Description)
l.err = resp.Error
return false
}

Expand Down

0 comments on commit 4d7a668

Please sign in to comment.