Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrpio committed Aug 16, 2022
1 parent 4d7a668 commit 82272a0
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 104 deletions.
8 changes: 4 additions & 4 deletions js.go
Expand Up @@ -1654,7 +1654,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
cleanUpSub()
}
if consumer != _EMPTY_ &&
(cinfo.Error.ErrorCode == ConsumerAlreadyExists || cinfo.Error.ErrorCode == ConsumerNameExists) {
(cinfo.Error.ErrorCode == JSErrCodeConsumerAlreadyExists || cinfo.Error.ErrorCode == JSErrCodeConsumerNameExists) {

info, err = js.ConsumerInfo(stream, consumer)
if err != nil {
Expand Down Expand Up @@ -1691,7 +1691,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
hasHeartbeats = info.Config.Heartbeat > 0
}
} else {
if cinfo.Error.ErrorCode == StreamNotFound {
if cinfo.Error.ErrorCode == JSErrCodeStreamNotFound {
return nil, ErrStreamNotFound
}
return nil, cinfo.Error
Expand Down Expand Up @@ -2750,10 +2750,10 @@ func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer strin
return nil, err
}
if info.Error != nil {
if info.Error.ErrorCode == ConsumerNotFound {
if info.Error.ErrorCode == JSErrCodeConsumerNotFound {
return nil, ErrConsumerNotFound
}
if info.Error.ErrorCode == StreamNotFound {
if info.Error.ErrorCode == JSErrCodeStreamNotFound {
return nil, ErrStreamNotFound
}
return nil, info.Error
Expand Down
44 changes: 26 additions & 18 deletions jsm.go
Expand Up @@ -222,16 +222,17 @@ type accountInfoResponse struct {
type ErrorCode uint16

const (
JetStreamNotEnabledForAccount ErrorCode = 10039
JSErrCodeJetStreamNotEnabledForAccount ErrorCode = 10039
JSErrCodeJetStreamNotEnabled ErrorCode = 10076

StreamNotFound ErrorCode = 10059
StreamNameInUse ErrorCode = 10058
JSErrCodeStreamNotFound ErrorCode = 10059
JSErrCodeStreamNameInUse ErrorCode = 10058

ConsumerNotFound ErrorCode = 10014
ConsumerNameExists ErrorCode = 10013
ConsumerAlreadyExists ErrorCode = 10105
JSErrCodeConsumerNotFound ErrorCode = 10014
JSErrCodeConsumerNameExists ErrorCode = 10013
JSErrCodeConsumerAlreadyExists ErrorCode = 10105

MessageNotFound ErrorCode = 10037
JSErrCodeMessageNotFound ErrorCode = 10037
)

// Error prints the JetStream API error code and description
Expand Down Expand Up @@ -264,7 +265,10 @@ func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) {
return nil, err
}
if info.Error != nil {
if info.Error.ErrorCode == JetStreamNotEnabledForAccount {
if info.Error.ErrorCode == JSErrCodeJetStreamNotEnabledForAccount {
return nil, ErrJetStreamNotEnabledForAccount
}
if info.Error.ErrorCode == JSErrCodeJetStreamNotEnabled {
return nil, ErrJetStreamNotEnabled
}
return nil, info.Error
Expand All @@ -290,8 +294,12 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C
if err != nil && !errors.Is(err, ErrConsumerNotFound) && !errors.Is(err, ErrStreamNotFound) {
return nil, err
}

if consInfo != nil {
return nil, fmt.Errorf("creating consumer %q on stream %q: %w", cfg.Durable, stream, ErrConsumerNameAlreadyInUse)
sameConfig := checkConfig(&consInfo.Config, cfg)
if sameConfig != nil {
return nil, fmt.Errorf("%w: creating consumer %q on stream %q", ErrConsumerNameAlreadyInUse, cfg.Durable, stream)
}
}
}

Expand Down Expand Up @@ -348,10 +356,10 @@ func (js *js) upsertConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt)
return nil, err
}
if info.Error != nil {
if info.Error.ErrorCode == StreamNotFound {
if info.Error.ErrorCode == JSErrCodeStreamNotFound {
return nil, ErrStreamNotFound
}
if info.Error.ErrorCode == ConsumerNotFound {
if info.Error.ErrorCode == JSErrCodeConsumerNotFound {
return nil, ErrConsumerNotFound
}
return nil, info.Error
Expand Down Expand Up @@ -414,7 +422,7 @@ func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error {
}

if resp.Error != nil {
if resp.Error.ErrorCode == ConsumerNotFound {
if resp.Error.ErrorCode == JSErrCodeConsumerNotFound {
return ErrConsumerNotFound
}
return resp.Error
Expand Down Expand Up @@ -685,7 +693,7 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
return nil, err
}
if resp.Error != nil {
if resp.Error.ErrorCode == StreamNameInUse {
if resp.Error.ErrorCode == JSErrCodeStreamNameInUse {
return nil, ErrStreamNameAlreadyInUse
}
return nil, resp.Error
Expand Down Expand Up @@ -733,7 +741,7 @@ func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) {
return nil, err
}
if resp.Error != nil {
if resp.Error.ErrorCode == StreamNotFound {
if resp.Error.ErrorCode == JSErrCodeStreamNotFound {
return nil, ErrStreamNotFound
}
return nil, resp.Error
Expand Down Expand Up @@ -825,7 +833,7 @@ func (js *js) UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error
return nil, err
}
if resp.Error != nil {
if resp.Error.ErrorCode == StreamNotFound {
if resp.Error.ErrorCode == JSErrCodeStreamNotFound {
return nil, ErrStreamNotFound
}
return nil, resp.Error
Expand Down Expand Up @@ -863,7 +871,7 @@ func (js *js) DeleteStream(name string, opts ...JSOpt) error {
}

if resp.Error != nil {
if resp.Error.ErrorCode == StreamNotFound {
if resp.Error.ErrorCode == JSErrCodeStreamNotFound {
return ErrStreamNotFound
}
return resp.Error
Expand Down Expand Up @@ -967,10 +975,10 @@ func (js *js) getMsg(name string, mreq *apiMsgGetRequest, opts ...JSOpt) (*RawSt
return nil, err
}
if resp.Error != nil {
if resp.Error.ErrorCode == MessageNotFound {
if resp.Error.ErrorCode == JSErrCodeMessageNotFound {
return nil, ErrMsgNotFound
}
if resp.Error.ErrorCode == StreamNotFound {
if resp.Error.ErrorCode == JSErrCodeStreamNotFound {
return nil, ErrStreamNotFound
}
return nil, resp.Error
Expand Down
165 changes: 85 additions & 80 deletions nats.go
Expand Up @@ -90,86 +90,91 @@ const (

// Errors
var (
ErrConnectionClosed = errors.New("nats: connection closed")
ErrConnectionDraining = errors.New("nats: connection draining")
ErrDrainTimeout = errors.New("nats: draining connection timed out")
ErrConnectionReconnecting = errors.New("nats: connection reconnecting")
ErrSecureConnRequired = errors.New("nats: secure connection required")
ErrSecureConnWanted = errors.New("nats: secure connection not available")
ErrBadSubscription = errors.New("nats: invalid subscription")
ErrTypeSubscription = errors.New("nats: invalid subscription type")
ErrBadSubject = errors.New("nats: invalid subject")
ErrBadQueueName = errors.New("nats: invalid queue name")
ErrSlowConsumer = errors.New("nats: slow consumer, messages dropped")
ErrTimeout = errors.New("nats: timeout")
ErrBadTimeout = errors.New("nats: timeout invalid")
ErrAuthorization = errors.New("nats: authorization violation")
ErrAuthExpired = errors.New("nats: authentication expired")
ErrAuthRevoked = errors.New("nats: authentication revoked")
ErrAccountAuthExpired = errors.New("nats: account authentication expired")
ErrNoServers = errors.New("nats: no servers available for connection")
ErrJsonParse = errors.New("nats: connect message, json parse error")
ErrChanArg = errors.New("nats: argument needs to be a channel type")
ErrMaxPayload = errors.New("nats: maximum payload exceeded")
ErrMaxMessages = errors.New("nats: maximum messages delivered")
ErrSyncSubRequired = errors.New("nats: illegal call on an async subscription")
ErrMultipleTLSConfigs = errors.New("nats: multiple tls.Configs not allowed")
ErrNoInfoReceived = errors.New("nats: protocol exception, INFO not received")
ErrReconnectBufExceeded = errors.New("nats: outbound buffer limit exceeded")
ErrInvalidConnection = errors.New("nats: invalid connection")
ErrInvalidMsg = errors.New("nats: invalid message or message nil")
ErrInvalidArg = errors.New("nats: invalid argument")
ErrInvalidContext = errors.New("nats: invalid context")
ErrNoDeadlineContext = errors.New("nats: context requires a deadline")
ErrNoEchoNotSupported = errors.New("nats: no echo option not supported by this server")
ErrClientIDNotSupported = errors.New("nats: client ID not supported by this server")
ErrUserButNoSigCB = errors.New("nats: user callback defined without a signature handler")
ErrNkeyButNoSigCB = errors.New("nats: nkey defined without a signature handler")
ErrNoUserCB = errors.New("nats: user callback not defined")
ErrNkeyAndUser = errors.New("nats: user callback and nkey defined")
ErrNkeysNotSupported = errors.New("nats: nkeys not supported by the server")
ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION)
ErrTokenAlreadySet = errors.New("nats: token and token handler both set")
ErrMsgNotBound = errors.New("nats: message is not bound to subscription/connection")
ErrMsgNoReply = errors.New("nats: message does not have a reply")
ErrClientIPNotSupported = errors.New("nats: client IP not supported by this server")
ErrDisconnected = errors.New("nats: server is disconnected")
ErrHeadersNotSupported = errors.New("nats: headers not supported by this server")
ErrBadHeaderMsg = errors.New("nats: message could not decode headers")
ErrNoResponders = errors.New("nats: no responders available for request")
ErrNoContextOrTimeout = errors.New("nats: no context or timeout given")
ErrPullModeNotAllowed = errors.New("nats: pull based not supported")
ErrJetStreamNotEnabled = errors.New("nats: jetstream not enabled")
ErrJetStreamBadPre = errors.New("nats: jetstream api prefix not valid")
ErrNoStreamResponse = errors.New("nats: no response from stream")
ErrNotJSMessage = errors.New("nats: not a jetstream message")
ErrInvalidStreamName = errors.New("nats: invalid stream name")
ErrInvalidConsumerName = errors.New("nats: invalid consumer name")
ErrNoMatchingStream = errors.New("nats: no stream matches subject")
ErrSubjectMismatch = errors.New("nats: subject does not match consumer")
ErrContextAndTimeout = errors.New("nats: context and timeout can not both be set")
ErrInvalidJSAck = errors.New("nats: invalid jetstream publish response")
ErrMultiStreamUnsupported = errors.New("nats: multiple streams are not supported")
ErrStreamConfigRequired = errors.New("nats: stream configuration is required")
ErrStreamNameRequired = errors.New("nats: stream name is required")
ErrStreamNotFound = errors.New("nats: stream not found")
ErrConsumerNotFound = errors.New("nats: consumer not found")
ErrConsumerNameRequired = errors.New("nats: consumer name is required")
ErrConsumerConfigRequired = errors.New("nats: consumer configuration is required")
ErrStreamSnapshotConfigRequired = errors.New("nats: stream snapshot configuration is required")
ErrDeliverSubjectRequired = errors.New("nats: deliver subject is required")
ErrPullSubscribeToPushConsumer = errors.New("nats: cannot pull subscribe to push based consumer")
ErrPullSubscribeRequired = errors.New("nats: must use pull subscribe to bind to pull based consumer")
ErrConsumerNotActive = errors.New("nats: consumer not active")
ErrConsumerNameAlreadyInUse = errors.New("nats: consumer name already in use")
ErrMsgNotFound = errors.New("nats: message not found")
ErrMsgAlreadyAckd = errors.New("nats: message was already acknowledged")
ErrCantAckIfConsumerAckNone = errors.New("nats: cannot acknowledge a message for a consumer with AckNone policy")
ErrStreamInfoMaxSubjects = errors.New("nats: subject details would exceed maximum allowed")
ErrStreamNameAlreadyInUse = errors.New("nats: stream name already in use")
ErrMaxConnectionsExceeded = errors.New("nats: server maximum connections exceeded")
ErrBadRequest = errors.New("nats: bad request")
ErrConnectionNotTLS = errors.New("nats: connection is not tls")
ErrConnectionClosed = errors.New("nats: connection closed")
ErrConnectionDraining = errors.New("nats: connection draining")
ErrDrainTimeout = errors.New("nats: draining connection timed out")
ErrConnectionReconnecting = errors.New("nats: connection reconnecting")
ErrSecureConnRequired = errors.New("nats: secure connection required")
ErrSecureConnWanted = errors.New("nats: secure connection not available")
ErrBadSubscription = errors.New("nats: invalid subscription")
ErrTypeSubscription = errors.New("nats: invalid subscription type")
ErrBadSubject = errors.New("nats: invalid subject")
ErrBadQueueName = errors.New("nats: invalid queue name")
ErrSlowConsumer = errors.New("nats: slow consumer, messages dropped")
ErrTimeout = errors.New("nats: timeout")
ErrBadTimeout = errors.New("nats: timeout invalid")
ErrAuthorization = errors.New("nats: authorization violation")
ErrAuthExpired = errors.New("nats: authentication expired")
ErrAuthRevoked = errors.New("nats: authentication revoked")
ErrAccountAuthExpired = errors.New("nats: account authentication expired")
ErrNoServers = errors.New("nats: no servers available for connection")
ErrJsonParse = errors.New("nats: connect message, json parse error")
ErrChanArg = errors.New("nats: argument needs to be a channel type")
ErrMaxPayload = errors.New("nats: maximum payload exceeded")
ErrMaxMessages = errors.New("nats: maximum messages delivered")
ErrSyncSubRequired = errors.New("nats: illegal call on an async subscription")
ErrMultipleTLSConfigs = errors.New("nats: multiple tls.Configs not allowed")
ErrNoInfoReceived = errors.New("nats: protocol exception, INFO not received")
ErrReconnectBufExceeded = errors.New("nats: outbound buffer limit exceeded")
ErrInvalidConnection = errors.New("nats: invalid connection")
ErrInvalidMsg = errors.New("nats: invalid message or message nil")
ErrInvalidArg = errors.New("nats: invalid argument")
ErrInvalidContext = errors.New("nats: invalid context")
ErrNoDeadlineContext = errors.New("nats: context requires a deadline")
ErrNoEchoNotSupported = errors.New("nats: no echo option not supported by this server")
ErrClientIDNotSupported = errors.New("nats: client ID not supported by this server")
ErrUserButNoSigCB = errors.New("nats: user callback defined without a signature handler")
ErrNkeyButNoSigCB = errors.New("nats: nkey defined without a signature handler")
ErrNoUserCB = errors.New("nats: user callback not defined")
ErrNkeyAndUser = errors.New("nats: user callback and nkey defined")
ErrNkeysNotSupported = errors.New("nats: nkeys not supported by the server")
ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION)
ErrTokenAlreadySet = errors.New("nats: token and token handler both set")
ErrMsgNotBound = errors.New("nats: message is not bound to subscription/connection")
ErrMsgNoReply = errors.New("nats: message does not have a reply")
ErrClientIPNotSupported = errors.New("nats: client IP not supported by this server")
ErrDisconnected = errors.New("nats: server is disconnected")
ErrHeadersNotSupported = errors.New("nats: headers not supported by this server")
ErrBadHeaderMsg = errors.New("nats: message could not decode headers")
ErrNoResponders = errors.New("nats: no responders available for request")
ErrNoContextOrTimeout = errors.New("nats: no context or timeout given")
ErrPullModeNotAllowed = errors.New("nats: pull based not supported")
ErrJetStreamNotEnabledForAccount = errors.New("nats: jetstream not enabled for this account")
ErrJetStreamNotEnabled = errors.New("nats: jetstream not enabled")
ErrJetStreamBadPre = errors.New("nats: jetstream api prefix not valid")
ErrNoStreamResponse = errors.New("nats: no response from stream")
ErrNotJSMessage = errors.New("nats: not a jetstream message")
ErrInvalidStreamName = errors.New("nats: invalid stream name")
ErrInvalidConsumerName = errors.New("nats: invalid consumer name")
ErrNoMatchingStream = errors.New("nats: no stream matches subject")
ErrSubjectMismatch = errors.New("nats: subject does not match consumer")
ErrContextAndTimeout = errors.New("nats: context and timeout can not both be set")
ErrInvalidJSAck = errors.New("nats: invalid jetstream publish response")
ErrMultiStreamUnsupported = errors.New("nats: multiple streams are not supported")
ErrStreamConfigRequired = errors.New("nats: stream configuration is required")
ErrStreamNameRequired = errors.New("nats: stream name is required")
ErrStreamNotFound = errors.New("nats: stream not found")
ErrConsumerNotFound = errors.New("nats: consumer not found")
ErrConsumerNameRequired = errors.New("nats: consumer name is required")
ErrConsumerConfigRequired = errors.New("nats: consumer configuration is required")
ErrStreamSnapshotConfigRequired = errors.New("nats: stream snapshot configuration is required")
ErrDeliverSubjectRequired = errors.New("nats: deliver subject is required")
ErrPullSubscribeToPushConsumer = errors.New("nats: cannot pull subscribe to push based consumer")
ErrPullSubscribeRequired = errors.New("nats: must use pull subscribe to bind to pull based consumer")
ErrConsumerNotActive = errors.New("nats: consumer not active")
ErrConsumerNameAlreadyInUse = errors.New("nats: consumer name already in use")
ErrMsgNotFound = errors.New("nats: message not found")
ErrMsgAlreadyAckd = errors.New("nats: message was already acknowledged")
ErrCantAckIfConsumerAckNone = errors.New("nats: cannot acknowledge a message for a consumer with AckNone policy")
ErrStreamInfoMaxSubjects = errors.New("nats: subject details would exceed maximum allowed")
ErrStreamNameAlreadyInUse = errors.New("nats: stream name already in use")
ErrMaxConnectionsExceeded = errors.New("nats: server maximum connections exceeded")
ErrBadRequest = errors.New("nats: bad request")
ErrConnectionNotTLS = errors.New("nats: connection is not tls")

// DEPRECATED: ErrInvalidDurableName is no longer returned and will be removed in future releases
// Use ErrInvalidConsumerName instead
ErrInvalidDurableName = errors.New("nats: invalid durable name")
)

func init() {
Expand Down

0 comments on commit 82272a0

Please sign in to comment.