Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add two new sentinel errors. ErrNoStream and ErrNoConsumer #760

Merged
merged 3 commits into from Jun 28, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 8 additions & 1 deletion js.go
Expand Up @@ -1048,7 +1048,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
// to which it should be attaching to.
if consumer != _EMPTY_ {
info, err = js.ConsumerInfo(stream, consumer)
notFoundErr = err != nil && strings.Contains(err.Error(), "consumer not found")
notFoundErr = errors.Is(err, ErrConsumerNotFound)
lookupErr = err == ErrJetStreamNotEnabled || err == ErrTimeout || err == context.DeadlineExceeded
}

Expand Down Expand Up @@ -1194,6 +1194,9 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
}
attached = true
} else {
if cinfo.Error.Code == 404 {
return nil, ErrStreamNotFound
}
return nil, fmt.Errorf("nats: %s", cinfo.Error.Description)
}
}
Expand Down Expand Up @@ -1355,6 +1358,7 @@ func (js *js) lookupStreamBySubject(subj string) (string, error) {
if err := json.Unmarshal(resp.Data, &slr); err != nil {
return _EMPTY_, err
}

if slr.Error != nil || len(slr.Streams) != 1 {
return _EMPTY_, ErrNoMatchingStream
}
Expand Down Expand Up @@ -1889,6 +1893,9 @@ func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer strin
return nil, err
}
if info.Error != nil {
if info.Error.Code == 404 {
return nil, ErrConsumerNotFound
}
return nil, fmt.Errorf("nats: %s", info.Error.Description)
}
return info.ConsumerInfo, nil
Expand Down
16 changes: 16 additions & 0 deletions jsm.go
Expand Up @@ -258,6 +258,9 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C
return nil, err
}
if info.Error != nil {
if info.Error.Code == 404 {
return nil, ErrConsumerNotFound
}
return nil, errors.New(info.Error.Description)
}
return info.ConsumerInfo, nil
Expand Down Expand Up @@ -292,7 +295,11 @@ func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error {
if err := json.Unmarshal(r.Data, &resp); err != nil {
return err
}

if resp.Error != nil {
if resp.Error.Code == 404 {
return ErrConsumerNotFound
}
return errors.New(resp.Error.Description)
}
return nil
Expand Down Expand Up @@ -559,6 +566,7 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
if resp.Error != nil {
return nil, errors.New(resp.Error.Description)
}

return resp.StreamInfo, nil
}

Expand Down Expand Up @@ -587,8 +595,12 @@ func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) {
return nil, err
}
if resp.Error != nil {
if resp.Error.Code == 404 {
return nil, ErrStreamNotFound
}
return nil, errors.New(resp.Error.Description)
}

return resp.StreamInfo, nil
}

Expand Down Expand Up @@ -701,7 +713,11 @@ func (js *js) DeleteStream(name string, opts ...JSOpt) error {
if err := json.Unmarshal(r.Data, &resp); err != nil {
return err
}

if resp.Error != nil {
if resp.Error.Code == 404 {
return ErrStreamNotFound
}
return errors.New(resp.Error.Description)
}
return nil
Expand Down
2 changes: 2 additions & 0 deletions nats.go
Expand Up @@ -146,6 +146,8 @@ var (
ErrInvalidJSAck = errors.New("nats: invalid jetstream publish response")
ErrMultiStreamUnsupported = errors.New("nats: multiple streams are not supported")
ErrStreamNameRequired = errors.New("nats: stream name is required")
ErrStreamNotFound = errors.New("stream not found")
wallyqs marked this conversation as resolved.
Show resolved Hide resolved
ErrConsumerNotFound = errors.New("consumer not found")
wallyqs marked this conversation as resolved.
Show resolved Hide resolved
ErrConsumerNameRequired = errors.New("nats: consumer name is required")
ErrConsumerConfigRequired = errors.New("nats: consumer configuration is required")
ErrStreamSnapshotConfigRequired = errors.New("nats: stream snapshot configuration is required")
Expand Down
39 changes: 30 additions & 9 deletions test/js_test.go
Expand Up @@ -16,6 +16,7 @@ package test
import (
"context"
"crypto/rand"
"errors"
"fmt"
"io/ioutil"
"net"
Expand Down Expand Up @@ -1742,6 +1743,16 @@ func TestJetStreamManagement(t *testing.T) {
js.Publish("foo", []byte("hi"))
}

t.Run("stream not found", func(t *testing.T) {
si, err = js.StreamInfo("bar")
if !errors.Is(err, nats.ErrStreamNotFound) {
t.Fatalf("Expected error: %v, got: %v", nats.ErrStreamNotFound, err)
}
if si != nil {
t.Fatalf("StreamInfo should be nil %+v", si)
}
})

t.Run("stream info", func(t *testing.T) {
si, err = js.StreamInfo("foo")
if err != nil {
Expand Down Expand Up @@ -1804,6 +1815,16 @@ func TestJetStreamManagement(t *testing.T) {
}
})

t.Run("consumer not found", func(t *testing.T) {
ci, err := js.ConsumerInfo("foo", "cld")
if !errors.Is(err, nats.ErrConsumerNotFound) {
t.Fatalf("Expected error: %v, got: %v", nats.ErrConsumerNotFound, err)
}
if ci != nil {
t.Fatalf("ConsumerInfo should be nil %+v", ci)
}
})

t.Run("list streams", func(t *testing.T) {
var infos []*nats.StreamInfo
for info := range js.StreamsInfo() {
Expand Down Expand Up @@ -1905,11 +1926,11 @@ func TestJetStreamManagement(t *testing.T) {
if info.Limits.MaxConsumers != -1 {
t.Errorf("Expected to not have consumer limits, got: %v", info.Limits.MaxConsumers)
}
if info.API.Total != 14 {
t.Errorf("Expected 15 API calls, got: %v", info.API.Total)
if info.API.Total != 16 {
t.Errorf("Expected 16 API calls, got: %v", info.API.Total)
}
if info.API.Errors != 1 {
t.Errorf("Expected 11 API error, got: %v", info.API.Errors)
if info.API.Errors != 3 {
t.Errorf("Expected 3 API error, got: %v", info.API.Errors)
}
})
}
Expand Down Expand Up @@ -3881,7 +3902,7 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) {
if err == nil {
t.Fatalf("Unexpected success")
}
if err.Error() != `consumer not found` {
if !errors.Is(err, nats.ErrConsumerNotFound) {
t.Errorf("Expected consumer not found error, got: %v", err)
}

Expand Down Expand Up @@ -4587,8 +4608,8 @@ func testJetStreamMirror_Source(t *testing.T, nodes ...*jsServer) {
if err == nil {
t.Fatal("Unexpected success")
}
if err.Error() != `nats: stream not found` {
t.Fatal("Expected stream not found error")
if !errors.Is(err, nats.ErrStreamNotFound) {
t.Fatal("Expected stream not found error", err.Error())
}
})

Expand Down Expand Up @@ -6099,13 +6120,13 @@ func TestJetStreamBindConsumer(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
_, err = js.SubscribeSync("foo", nats.Bind("foo", "push"))
if err == nil || err != nil && !strings.Contains(err.Error(), "consumer not found") {
if err == nil || !errors.Is(err, nats.ErrConsumerNotFound) {
t.Fatalf("Unexpected error: %v", err)
}

// Pull consumer
_, err = js.PullSubscribe("foo", "pull", nats.Bind("foo", "pull"))
if err == nil || err != nil && !strings.Contains(err.Error(), "consumer not found") {
if err == nil || !errors.Is(err, nats.ErrConsumerNotFound) {
t.Fatalf("Unexpected error: %v", err)
}

Expand Down