diff --git a/js.go b/js.go index 7a0b8e474..7ee9f9f4d 100644 --- a/js.go +++ b/js.go @@ -1048,7 +1048,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync // to which it should be attaching to. if consumer != _EMPTY_ { info, err = js.ConsumerInfo(stream, consumer) - notFoundErr = err != nil && strings.Contains(err.Error(), "consumer not found") + notFoundErr = errors.Is(err, ErrConsumerNotFound) lookupErr = err == ErrJetStreamNotEnabled || err == ErrTimeout || err == context.DeadlineExceeded } @@ -1194,6 +1194,9 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync } attached = true } else { + if cinfo.Error.Code == 404 { + return nil, ErrStreamNotFound + } return nil, fmt.Errorf("nats: %s", cinfo.Error.Description) } } @@ -1355,6 +1358,7 @@ func (js *js) lookupStreamBySubject(subj string) (string, error) { if err := json.Unmarshal(resp.Data, &slr); err != nil { return _EMPTY_, err } + if slr.Error != nil || len(slr.Streams) != 1 { return _EMPTY_, ErrNoMatchingStream } @@ -1889,6 +1893,9 @@ func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer strin return nil, err } if info.Error != nil { + if info.Error.Code == 404 { + return nil, ErrConsumerNotFound + } return nil, fmt.Errorf("nats: %s", info.Error.Description) } return info.ConsumerInfo, nil diff --git a/jsm.go b/jsm.go index e485ae14a..db38ff825 100644 --- a/jsm.go +++ b/jsm.go @@ -258,6 +258,9 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C return nil, err } if info.Error != nil { + if info.Error.Code == 404 { + return nil, ErrConsumerNotFound + } return nil, errors.New(info.Error.Description) } return info.ConsumerInfo, nil @@ -292,7 +295,11 @@ func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error { if err := json.Unmarshal(r.Data, &resp); err != nil { return err } + if resp.Error != nil { + if resp.Error.Code == 404 { + return ErrConsumerNotFound + } return errors.New(resp.Error.Description) } return nil @@ -559,6 +566,7 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) { if resp.Error != nil { return nil, errors.New(resp.Error.Description) } + return resp.StreamInfo, nil } @@ -587,8 +595,12 @@ func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) { return nil, err } if resp.Error != nil { + if resp.Error.Code == 404 { + return nil, ErrStreamNotFound + } return nil, errors.New(resp.Error.Description) } + return resp.StreamInfo, nil } @@ -701,7 +713,11 @@ func (js *js) DeleteStream(name string, opts ...JSOpt) error { if err := json.Unmarshal(r.Data, &resp); err != nil { return err } + if resp.Error != nil { + if resp.Error.Code == 404 { + return ErrStreamNotFound + } return errors.New(resp.Error.Description) } return nil diff --git a/nats.go b/nats.go index 502168340..6e5410bda 100644 --- a/nats.go +++ b/nats.go @@ -146,6 +146,8 @@ var ( ErrInvalidJSAck = errors.New("nats: invalid jetstream publish response") ErrMultiStreamUnsupported = errors.New("nats: multiple streams are not supported") ErrStreamNameRequired = errors.New("nats: stream name is required") + ErrStreamNotFound = errors.New("nats: stream not found") + ErrConsumerNotFound = errors.New("nats: consumer not found") ErrConsumerNameRequired = errors.New("nats: consumer name is required") ErrConsumerConfigRequired = errors.New("nats: consumer configuration is required") ErrStreamSnapshotConfigRequired = errors.New("nats: stream snapshot configuration is required") diff --git a/test/js_test.go b/test/js_test.go index a9367f698..fba2a4691 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -16,6 +16,7 @@ package test import ( "context" "crypto/rand" + "errors" "fmt" "io/ioutil" "net" @@ -1742,6 +1743,16 @@ func TestJetStreamManagement(t *testing.T) { js.Publish("foo", []byte("hi")) } + t.Run("stream not found", func(t *testing.T) { + si, err = js.StreamInfo("bar") + if !errors.Is(err, nats.ErrStreamNotFound) { + t.Fatalf("Expected error: %v, got: %v", nats.ErrStreamNotFound, err) + } + if si != nil { + t.Fatalf("StreamInfo should be nil %+v", si) + } + }) + t.Run("stream info", func(t *testing.T) { si, err = js.StreamInfo("foo") if err != nil { @@ -1804,6 +1815,16 @@ func TestJetStreamManagement(t *testing.T) { } }) + t.Run("consumer not found", func(t *testing.T) { + ci, err := js.ConsumerInfo("foo", "cld") + if !errors.Is(err, nats.ErrConsumerNotFound) { + t.Fatalf("Expected error: %v, got: %v", nats.ErrConsumerNotFound, err) + } + if ci != nil { + t.Fatalf("ConsumerInfo should be nil %+v", ci) + } + }) + t.Run("list streams", func(t *testing.T) { var infos []*nats.StreamInfo for info := range js.StreamsInfo() { @@ -1905,11 +1926,11 @@ func TestJetStreamManagement(t *testing.T) { if info.Limits.MaxConsumers != -1 { t.Errorf("Expected to not have consumer limits, got: %v", info.Limits.MaxConsumers) } - if info.API.Total != 14 { - t.Errorf("Expected 15 API calls, got: %v", info.API.Total) + if info.API.Total != 16 { + t.Errorf("Expected 16 API calls, got: %v", info.API.Total) } - if info.API.Errors != 1 { - t.Errorf("Expected 11 API error, got: %v", info.API.Errors) + if info.API.Errors != 3 { + t.Errorf("Expected 3 API error, got: %v", info.API.Errors) } }) } @@ -3881,7 +3902,7 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) { if err == nil { t.Fatalf("Unexpected success") } - if err.Error() != `consumer not found` { + if !errors.Is(err, nats.ErrConsumerNotFound) { t.Errorf("Expected consumer not found error, got: %v", err) } @@ -4587,8 +4608,8 @@ func testJetStreamMirror_Source(t *testing.T, nodes ...*jsServer) { if err == nil { t.Fatal("Unexpected success") } - if err.Error() != `nats: stream not found` { - t.Fatal("Expected stream not found error") + if !errors.Is(err, nats.ErrStreamNotFound) { + t.Fatal("Expected stream not found error", err.Error()) } }) @@ -6099,13 +6120,13 @@ func TestJetStreamBindConsumer(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } _, err = js.SubscribeSync("foo", nats.Bind("foo", "push")) - if err == nil || err != nil && !strings.Contains(err.Error(), "consumer not found") { + if err == nil || !errors.Is(err, nats.ErrConsumerNotFound) { t.Fatalf("Unexpected error: %v", err) } // Pull consumer _, err = js.PullSubscribe("foo", "pull", nats.Bind("foo", "pull")) - if err == nil || err != nil && !strings.Contains(err.Error(), "consumer not found") { + if err == nil || !errors.Is(err, nats.ErrConsumerNotFound) { t.Fatalf("Unexpected error: %v", err) }