Skip to content

Commit

Permalink
squashed commit, adding sentinel errors ErrStreamNotFound and ErrCons…
Browse files Browse the repository at this point in the history
…umerNotFound
  • Loading branch information
actatum committed Jun 24, 2021
1 parent 4b75fc5 commit 9b8ff38
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 10 deletions.
9 changes: 8 additions & 1 deletion js.go
Expand Up @@ -1048,7 +1048,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
// to which it should be attaching to.
if consumer != _EMPTY_ {
info, err = js.ConsumerInfo(stream, consumer)
notFoundErr = err != nil && strings.Contains(err.Error(), "consumer not found")
notFoundErr = errors.Is(err, ErrConsumerNotFound)
lookupErr = err == ErrJetStreamNotEnabled || err == ErrTimeout || err == context.DeadlineExceeded
}

Expand Down Expand Up @@ -1194,6 +1194,9 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
}
attached = true
} else {
if cinfo.Error.Code == 404 {
return nil, ErrStreamNotFound
}
return nil, fmt.Errorf("nats: %s", cinfo.Error.Description)
}
}
Expand Down Expand Up @@ -1355,6 +1358,7 @@ func (js *js) lookupStreamBySubject(subj string) (string, error) {
if err := json.Unmarshal(resp.Data, &slr); err != nil {
return _EMPTY_, err
}

if slr.Error != nil || len(slr.Streams) != 1 {
return _EMPTY_, ErrNoMatchingStream
}
Expand Down Expand Up @@ -1889,6 +1893,9 @@ func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer strin
return nil, err
}
if info.Error != nil {
if info.Error.Code == 404 {
return nil, ErrConsumerNotFound
}
return nil, fmt.Errorf("nats: %s", info.Error.Description)
}
return info.ConsumerInfo, nil
Expand Down
16 changes: 16 additions & 0 deletions jsm.go
Expand Up @@ -258,6 +258,9 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C
return nil, err
}
if info.Error != nil {
if info.Error.Code == 404 {
return nil, ErrConsumerNotFound
}
return nil, errors.New(info.Error.Description)
}
return info.ConsumerInfo, nil
Expand Down Expand Up @@ -292,7 +295,11 @@ func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error {
if err := json.Unmarshal(r.Data, &resp); err != nil {
return err
}

if resp.Error != nil {
if resp.Error.Code == 404 {
return ErrConsumerNotFound
}
return errors.New(resp.Error.Description)
}
return nil
Expand Down Expand Up @@ -559,6 +566,7 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
if resp.Error != nil {
return nil, errors.New(resp.Error.Description)
}

return resp.StreamInfo, nil
}

Expand Down Expand Up @@ -587,8 +595,12 @@ func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) {
return nil, err
}
if resp.Error != nil {
if resp.Error.Code == 404 {
return nil, ErrStreamNotFound
}
return nil, errors.New(resp.Error.Description)
}

return resp.StreamInfo, nil
}

Expand Down Expand Up @@ -701,7 +713,11 @@ func (js *js) DeleteStream(name string, opts ...JSOpt) error {
if err := json.Unmarshal(r.Data, &resp); err != nil {
return err
}

if resp.Error != nil {
if resp.Error.Code == 404 {
return ErrStreamNotFound
}
return errors.New(resp.Error.Description)
}
return nil
Expand Down
2 changes: 2 additions & 0 deletions nats.go
Expand Up @@ -146,6 +146,8 @@ var (
ErrInvalidJSAck = errors.New("nats: invalid jetstream publish response")
ErrMultiStreamUnsupported = errors.New("nats: multiple streams are not supported")
ErrStreamNameRequired = errors.New("nats: stream name is required")
ErrStreamNotFound = errors.New("stream not found")
ErrConsumerNotFound = errors.New("consumer not found")
ErrConsumerNameRequired = errors.New("nats: consumer name is required")
ErrConsumerConfigRequired = errors.New("nats: consumer configuration is required")
ErrStreamSnapshotConfigRequired = errors.New("nats: stream snapshot configuration is required")
Expand Down
39 changes: 30 additions & 9 deletions test/js_test.go
Expand Up @@ -16,6 +16,7 @@ package test
import (
"context"
"crypto/rand"
"errors"
"fmt"
"io/ioutil"
"net"
Expand Down Expand Up @@ -1742,6 +1743,16 @@ func TestJetStreamManagement(t *testing.T) {
js.Publish("foo", []byte("hi"))
}

t.Run("stream not found", func(t *testing.T) {
si, err = js.StreamInfo("bar")
if !errors.Is(err, nats.ErrStreamNotFound) {
t.Fatalf("Expected error: %v, got: %v", nats.ErrStreamNotFound, err)
}
if si != nil {
t.Fatalf("StreamInfo should be nil %+v", si)
}
})

t.Run("stream info", func(t *testing.T) {
si, err = js.StreamInfo("foo")
if err != nil {
Expand Down Expand Up @@ -1804,6 +1815,16 @@ func TestJetStreamManagement(t *testing.T) {
}
})

t.Run("consumer not found", func(t *testing.T) {
ci, err := js.ConsumerInfo("foo", "cld")
if !errors.Is(err, nats.ErrConsumerNotFound) {
t.Fatalf("Expected error: %v, got: %v", nats.ErrConsumerNotFound, err)
}
if ci != nil {
t.Fatalf("ConsumerInfo should be nil %+v", ci)
}
})

t.Run("list streams", func(t *testing.T) {
var infos []*nats.StreamInfo
for info := range js.StreamsInfo() {
Expand Down Expand Up @@ -1905,11 +1926,11 @@ func TestJetStreamManagement(t *testing.T) {
if info.Limits.MaxConsumers != -1 {
t.Errorf("Expected to not have consumer limits, got: %v", info.Limits.MaxConsumers)
}
if info.API.Total != 14 {
t.Errorf("Expected 15 API calls, got: %v", info.API.Total)
if info.API.Total != 16 {
t.Errorf("Expected 16 API calls, got: %v", info.API.Total)
}
if info.API.Errors != 1 {
t.Errorf("Expected 11 API error, got: %v", info.API.Errors)
if info.API.Errors != 3 {
t.Errorf("Expected 3 API error, got: %v", info.API.Errors)
}
})
}
Expand Down Expand Up @@ -3881,7 +3902,7 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) {
if err == nil {
t.Fatalf("Unexpected success")
}
if err.Error() != `consumer not found` {
if !errors.Is(err, nats.ErrConsumerNotFound) {
t.Errorf("Expected consumer not found error, got: %v", err)
}

Expand Down Expand Up @@ -4587,8 +4608,8 @@ func testJetStreamMirror_Source(t *testing.T, nodes ...*jsServer) {
if err == nil {
t.Fatal("Unexpected success")
}
if err.Error() != `nats: stream not found` {
t.Fatal("Expected stream not found error")
if !errors.Is(err, nats.ErrStreamNotFound) {
t.Fatal("Expected stream not found error", err.Error())
}
})

Expand Down Expand Up @@ -6099,13 +6120,13 @@ func TestJetStreamBindConsumer(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
_, err = js.SubscribeSync("foo", nats.Bind("foo", "push"))
if err == nil || err != nil && !strings.Contains(err.Error(), "consumer not found") {
if err == nil || !errors.Is(err, nats.ErrConsumerNotFound) {
t.Fatalf("Unexpected error: %v", err)
}

// Pull consumer
_, err = js.PullSubscribe("foo", "pull", nats.Bind("foo", "pull"))
if err == nil || err != nil && !strings.Contains(err.Error(), "consumer not found") {
if err == nil || !errors.Is(err, nats.ErrConsumerNotFound) {
t.Fatalf("Unexpected error: %v", err)
}

Expand Down

0 comments on commit 9b8ff38

Please sign in to comment.