Skip to content

Commit

Permalink
js: Add JetStreamError with API response details (#1047)
Browse files Browse the repository at this point in the history
* js: Add JetStreamError with more API response details
* js: Convert errors to JetStreamError and move to separate file

Signed-off-by: Waldemar Quevedo <wally@nats.io>
Co-authored-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
wallyqs and piotrpio committed Aug 23, 2022
1 parent b81c9e7 commit 164805c
Show file tree
Hide file tree
Showing 5 changed files with 408 additions and 152 deletions.
6 changes: 3 additions & 3 deletions js.go
Expand Up @@ -1693,7 +1693,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
hasHeartbeats = info.Config.Heartbeat > 0
}
} else {
if cinfo.Error.ErrorCode == JSErrCodeStreamNotFound {
if errors.Is(cinfo.Error, ErrStreamNotFound) {
return nil, ErrStreamNotFound
}
return nil, cinfo.Error
Expand Down Expand Up @@ -2772,10 +2772,10 @@ func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer strin
return nil, err
}
if info.Error != nil {
if info.Error.ErrorCode == JSErrCodeConsumerNotFound {
if errors.Is(info.Error, ErrConsumerNotFound) {
return nil, ErrConsumerNotFound
}
if info.Error.ErrorCode == JSErrCodeStreamNotFound {
if errors.Is(info.Error, ErrStreamNotFound) {
return nil, ErrStreamNotFound
}
return nil, info.Error
Expand Down
185 changes: 185 additions & 0 deletions jserrors.go
@@ -0,0 +1,185 @@
// Copyright 2020-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package nats

import (
"errors"
"fmt"
)

var (
// API errors

// ErrJetStreamNotEnabled is an error returned when JetStream is not enabled for an account.
ErrJetStreamNotEnabled JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeJetStreamNotEnabled, Description: "jetstream not enabled", Code: 503}}

// ErrJetStreamNotEnabledForAccount is an error returned when JetStream is not enabled for an account.
ErrJetStreamNotEnabledForAccount JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeJetStreamNotEnabledForAccount, Description: "jetstream not enabled for account", Code: 503}}

// ErrStreamNotFound is an error returned when stream with given name does not exist.
ErrStreamNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamNotFound, Description: "stream not found", Code: 404}}

// ErrStreamNameAlreadyInUse is returned when a stream with given name already exists and has a different configuration
ErrStreamNameAlreadyInUse JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamNameInUse, Description: "stream name already in use", Code: 400}}

// ErrConsumerNotFound is an error returned when consumer with given name does not exist.
ErrConsumerNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerNotFound, Description: "consumer not found", Code: 404}}

// ErrMsgNotFound is returned when message with provided sequence number does npt exist.
ErrMsgNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeMessageNotFound, Description: "message not found", Code: 404}}

// ErrBadRequest is returned when invalid request is sent to JetStream API.
ErrBadRequest JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeBadRequest, Description: "bad request", Code: 400}}

// Client errors

// ErrConsumerNotFound is an error returned when consumer with given name does not exist.
ErrConsumerNameAlreadyInUse JetStreamError = &jsError{message: "consumer name already in use"}

// ErrConsumerNotActive is an error returned when consumer is not active.
ErrConsumerNotActive JetStreamError = &jsError{message: "consumer not active"}

// ErrInvalidJSAck is returned when JetStream ack from message publish is invalid.
ErrInvalidJSAck JetStreamError = &jsError{message: "invalid jetstream publish response"}

// ErrStreamConfigRequired is returned when empty stream configuration is supplied to add/update stream.
ErrStreamConfigRequired JetStreamError = &jsError{message: "stream configuration is required"}

// ErrStreamNameRequired is returned when the provided stream name is empty.
ErrStreamNameRequired JetStreamError = &jsError{message: "stream name is required"}

// ErrConsumerNameRequired is returned when the provided consumer durable name is empty,
ErrConsumerNameRequired JetStreamError = &jsError{message: "consumer name is required"}

// ErrConsumerConfigRequired is returned when empty consumer consuguration is supplied to add/update consumer.
ErrConsumerConfigRequired JetStreamError = &jsError{message: "consumer configuration is required"}

// ErrPullSubscribeToPushConsumer is returned when attempting to use PullSubscribe on push consumer.
ErrPullSubscribeToPushConsumer JetStreamError = &jsError{message: "cannot pull subscribe to push based consumer"}

// ErrPullSubscribeRequired is returned when attempting to use subscribe methods not suitable for pull consumers for pull consumers.
ErrPullSubscribeRequired JetStreamError = &jsError{message: "must use pull subscribe to bind to pull based consumer"}

// ErrMsgAlreadyAckd is returned when attempting to acknowledge message more than once.
ErrMsgAlreadyAckd JetStreamError = &jsError{message: "message was already acknowledged"}

// ErrNoStreamResponse is returned when there is no response from stream (e.g. no responders error).
ErrNoStreamResponse JetStreamError = &jsError{message: "no response from stream"}

// ErrNotJSMessage is returned when attempting to get metadata from non JetStream message .
ErrNotJSMessage JetStreamError = &jsError{message: "not a jetstream message"}

// ErrInvalidStreamName is returned when the provided stream name is invalid (contains '.').
ErrInvalidStreamName JetStreamError = &jsError{message: "invalid stream name"}

// ErrInvalidConsumerName is returned when the provided consumer name is invalid (contains '.').
ErrInvalidConsumerName JetStreamError = &jsError{message: "invalid consumer name"}

// ErrNoMatchingStream is returned when stream lookup by subject is unsuccessful.
ErrNoMatchingStream JetStreamError = &jsError{message: "no stream matches subject"}

// ErrSubjectMismatch is returned when the provided subject does not match consumer's filter subject.
ErrSubjectMismatch JetStreamError = &jsError{message: "subject does not match consumer"}

// ErrContextAndTimeout is returned when attempting to use both context and timeout.
ErrContextAndTimeout JetStreamError = &jsError{message: "context and timeout can not both be set"}

// ErrCantAckIfConsumerAckNone is returned when attempting to ack a message for consumer with AckNone policy set.
ErrCantAckIfConsumerAckNone JetStreamError = &jsError{message: "cannot acknowledge a message for a consumer with AckNone policy"}

// DEPRECATED: ErrInvalidDurableName is no longer returned and will be removed in future releases
// Use ErrInvalidConsumerName instead
ErrInvalidDurableName = errors.New("nats: invalid durable name")
)

// Error code represents JetStream error codes returned by the API
type ErrorCode uint16

const (
JSErrCodeJetStreamNotEnabledForAccount ErrorCode = 10039
JSErrCodeJetStreamNotEnabled ErrorCode = 10076

JSErrCodeStreamNotFound ErrorCode = 10059
JSErrCodeStreamNameInUse ErrorCode = 10058

JSErrCodeConsumerNotFound ErrorCode = 10014
JSErrCodeConsumerNameExists ErrorCode = 10013
JSErrCodeConsumerAlreadyExists ErrorCode = 10105

JSErrCodeMessageNotFound ErrorCode = 10037

JSErrCodeBadRequest ErrorCode = 10003
)

// APIError is included in all API responses if there was an error.
type APIError struct {
Code int `json:"code"`
ErrorCode ErrorCode `json:"err_code"`
Description string `json:"description,omitempty"`
}

// Error prints the JetStream API error code and description
func (e *APIError) Error() string {
return fmt.Sprintf("nats: API error %d: %s", e.ErrorCode, e.Description)
}

// APIError implements the JetStreamError interface.
func (e *APIError) APIError() *APIError {
return e
}

// Is matches against an APIError.
func (e *APIError) Is(err error) bool {
if e == nil {
return false
}
// Extract internal APIError to match against.
var aerr *APIError
ok := errors.As(err, &aerr)
if !ok {
return ok
}
return e.ErrorCode == aerr.ErrorCode
}

// JetStreamError is an error result that happens when using JetStream.
// In case of client-side error, `APIError()` returns nil
type JetStreamError interface {
APIError() *APIError
error
}

type jsError struct {
apiErr *APIError
message string
}

func (err *jsError) APIError() *APIError {
return err.apiErr
}

func (err *jsError) Error() string {
if err.apiErr != nil && err.apiErr.Description != "" {
return err.apiErr.Error()
}
return fmt.Sprintf("nats: %s", err.message)
}

func (err *jsError) Unwrap() error {
// Allow matching to embedded APIError in case there is one.
if err.apiErr == nil {
return nil
}
return err.apiErr
}
54 changes: 12 additions & 42 deletions jsm.go
Expand Up @@ -154,13 +154,6 @@ type ExternalStream struct {
DeliverPrefix string `json:"deliver"`
}

// APIError is included in all API responses if there was an error.
type APIError struct {
Code int `json:"code"`
ErrorCode ErrorCode `json:"err_code"`
Description string `json:"description,omitempty"`
}

// apiResponse is a standard response from the JetStream JSON API
type apiResponse struct {
Type string `json:"type"`
Expand Down Expand Up @@ -219,27 +212,6 @@ type accountInfoResponse struct {
AccountInfo
}

type ErrorCode uint16

const (
JSErrCodeJetStreamNotEnabledForAccount ErrorCode = 10039
JSErrCodeJetStreamNotEnabled ErrorCode = 10076

JSErrCodeStreamNotFound ErrorCode = 10059
JSErrCodeStreamNameInUse ErrorCode = 10058

JSErrCodeConsumerNotFound ErrorCode = 10014
JSErrCodeConsumerNameExists ErrorCode = 10013
JSErrCodeConsumerAlreadyExists ErrorCode = 10105

JSErrCodeMessageNotFound ErrorCode = 10037
)

// Error prints the JetStream API error code and description
func (e *APIError) Error() string {
return fmt.Sprintf("nats: API error %d: %s", e.ErrorCode, e.Description)
}

// AccountInfo retrieves info about the JetStream usage from the current account.
// If JetStream is not enabled, this will return ErrJetStreamNotEnabled
// Other errors can happen but are generally considered retryable
Expand All @@ -265,12 +237,10 @@ func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) {
return nil, err
}
if info.Error != nil {
if info.Error.ErrorCode == JSErrCodeJetStreamNotEnabledForAccount {
// Internally checks based on error code instead of description match.
if errors.Is(info.Error, ErrJetStreamNotEnabledForAccount) {
return nil, ErrJetStreamNotEnabledForAccount
}
if info.Error.ErrorCode == JSErrCodeJetStreamNotEnabled {
return nil, ErrJetStreamNotEnabled
}
return nil, info.Error
}

Expand Down Expand Up @@ -356,10 +326,10 @@ func (js *js) upsertConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt)
return nil, err
}
if info.Error != nil {
if info.Error.ErrorCode == JSErrCodeStreamNotFound {
if errors.Is(info.Error, ErrStreamNotFound) {
return nil, ErrStreamNotFound
}
if info.Error.ErrorCode == JSErrCodeConsumerNotFound {
if errors.Is(info.Error, ErrConsumerNotFound) {
return nil, ErrConsumerNotFound
}
return nil, info.Error
Expand Down Expand Up @@ -422,7 +392,7 @@ func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error {
}

if resp.Error != nil {
if resp.Error.ErrorCode == JSErrCodeConsumerNotFound {
if errors.Is(resp.Error, ErrConsumerNotFound) {
return ErrConsumerNotFound
}
return resp.Error
Expand Down Expand Up @@ -693,7 +663,7 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
return nil, err
}
if resp.Error != nil {
if resp.Error.ErrorCode == JSErrCodeStreamNameInUse {
if errors.Is(resp.Error, ErrStreamNameAlreadyInUse) {
return nil, ErrStreamNameAlreadyInUse
}
return nil, resp.Error
Expand Down Expand Up @@ -741,7 +711,7 @@ func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) {
return nil, err
}
if resp.Error != nil {
if resp.Error.ErrorCode == JSErrCodeStreamNotFound {
if errors.Is(resp.Error, ErrStreamNotFound) {
return nil, ErrStreamNotFound
}
return nil, resp.Error
Expand Down Expand Up @@ -833,7 +803,7 @@ func (js *js) UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error
return nil, err
}
if resp.Error != nil {
if resp.Error.ErrorCode == JSErrCodeStreamNotFound {
if errors.Is(resp.Error, ErrStreamNotFound) {
return nil, ErrStreamNotFound
}
return nil, resp.Error
Expand Down Expand Up @@ -871,7 +841,7 @@ func (js *js) DeleteStream(name string, opts ...JSOpt) error {
}

if resp.Error != nil {
if resp.Error.ErrorCode == JSErrCodeStreamNotFound {
if errors.Is(resp.Error, ErrStreamNotFound) {
return ErrStreamNotFound
}
return resp.Error
Expand Down Expand Up @@ -975,10 +945,10 @@ func (js *js) getMsg(name string, mreq *apiMsgGetRequest, opts ...JSOpt) (*RawSt
return nil, err
}
if resp.Error != nil {
if resp.Error.ErrorCode == JSErrCodeMessageNotFound {
if errors.Is(resp.Error, ErrMsgNotFound) {
return nil, ErrMsgNotFound
}
if resp.Error.ErrorCode == JSErrCodeStreamNotFound {
if errors.Is(resp.Error, ErrStreamNotFound) {
return nil, ErrStreamNotFound
}
return nil, resp.Error
Expand Down Expand Up @@ -1189,7 +1159,7 @@ func (js *js) purgeStream(stream string, req *StreamPurgeRequest, opts ...JSOpt)
return err
}
if resp.Error != nil {
if resp.Error.Code == 400 {
if errors.Is(resp.Error, ErrBadRequest) {
return fmt.Errorf("%w: %s", ErrBadRequest, "invalid purge request body")
}
return resp.Error
Expand Down

0 comments on commit 164805c

Please sign in to comment.