diff --git a/.travis.yml b/.travis.yml index aa87e1392..1d05d3f83 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,7 +7,7 @@ install: - go get -t ./... - go get github.com/mattn/goveralls - go get github.com/wadey/gocovmerge -- go get -u honnef.co/go/tools/cmd/staticcheck +- go install honnef.co/go/tools/cmd/staticcheck@v0.2.2 - go get -u github.com/client9/misspell/cmd/misspell before_script: - $(exit $(go fmt ./... | wc -l)) diff --git a/jsm.go b/jsm.go index 11545f318..a8c560b65 100644 --- a/jsm.go +++ b/jsm.go @@ -229,6 +229,9 @@ type consumerResponse struct { // AddConsumer will add a JetStream consumer. func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) { + if err := checkStreamName(stream); err != nil { + return nil, err + } o, cancel, err := getJSContextOpts(js.opts, opts...) if err != nil { return nil, err @@ -237,9 +240,6 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C defer cancel() } - if stream == _EMPTY_ { - return nil, ErrStreamNameRequired - } req, err := json.Marshal(&createConsumerRequest{Stream: stream, Config: cfg}) if err != nil { return nil, err @@ -280,6 +280,9 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C } func (js *js) UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) { + if err := checkStreamName(stream); err != nil { + return nil, err + } if cfg == nil { return nil, ErrConsumerConfigRequired } @@ -295,8 +298,34 @@ type consumerDeleteResponse struct { Success bool `json:"success,omitempty"` } +func checkStreamName(stream string) error { + if stream == _EMPTY_ { + return ErrStreamNameRequired + } + if strings.Contains(stream, ".") { + return ErrInvalidStreamName + } + return nil +} + +func checkConsumerName(consumer string) error { + if consumer == _EMPTY_ { + return ErrConsumerNameRequired + } + if strings.Contains(consumer, ".") { + return ErrInvalidConsumerName + } + return nil +} + // DeleteConsumer deletes a Consumer. func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error { + if err := checkStreamName(stream); err != nil { + return err + } + if err := checkConsumerName(consumer); err != nil { + return err + } o, cancel, err := getJSContextOpts(js.opts, opts...) if err != nil { return err @@ -305,10 +334,6 @@ func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error { defer cancel() } - if stream == _EMPTY_ { - return ErrStreamNameRequired - } - dcSubj := js.apiSubj(fmt.Sprintf(apiConsumerDeleteT, stream, consumer)) r, err := js.apiRequestWithContext(o.ctx, dcSubj, nil) if err != nil { @@ -330,6 +355,12 @@ func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error { // ConsumerInfo returns information about a Consumer. func (js *js) ConsumerInfo(stream, consumer string, opts ...JSOpt) (*ConsumerInfo, error) { + if err := checkStreamName(stream); err != nil { + return nil, err + } + if err := checkConsumerName(consumer); err != nil { + return nil, err + } o, cancel, err := getJSContextOpts(js.opts, opts...) if err != nil { return nil, err @@ -369,8 +400,8 @@ func (c *consumerLister) Next() bool { if c.err != nil { return false } - if c.stream == _EMPTY_ { - c.err = ErrStreamNameRequired + if err := checkStreamName(c.stream); err != nil { + c.err = err return false } if c.pageInfo != nil && c.offset >= c.pageInfo.Total { @@ -474,8 +505,8 @@ func (c *consumerNamesLister) Next() bool { if c.err != nil { return false } - if c.stream == _EMPTY_ { - c.err = ErrStreamNameRequired + if err := checkStreamName(c.stream); err != nil { + c.err = err return false } if c.pageInfo != nil && c.offset >= c.pageInfo.Total { @@ -556,6 +587,12 @@ type streamCreateResponse struct { } func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) { + if cfg == nil { + return nil, ErrStreamConfigRequired + } + if err := checkStreamName(cfg.Name); err != nil { + return nil, err + } o, cancel, err := getJSContextOpts(js.opts, opts...) if err != nil { return nil, err @@ -564,14 +601,6 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) { defer cancel() } - if cfg == nil || cfg.Name == _EMPTY_ { - return nil, ErrStreamNameRequired - } - - if strings.Contains(cfg.Name, ".") { - return nil, ErrInvalidStreamName - } - req, err := json.Marshal(cfg) if err != nil { return nil, err @@ -599,10 +628,9 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) { type streamInfoResponse = streamCreateResponse func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) { - if strings.Contains(stream, ".") { - return nil, ErrInvalidStreamName + if err := checkStreamName(stream); err != nil { + return nil, err } - o, cancel, err := getJSContextOpts(js.opts, opts...) if err != nil { return nil, err @@ -678,6 +706,12 @@ type PeerInfo struct { // UpdateStream updates a Stream. func (js *js) UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) { + if cfg == nil { + return nil, ErrStreamConfigRequired + } + if err := checkStreamName(cfg.Name); err != nil { + return nil, err + } o, cancel, err := getJSContextOpts(js.opts, opts...) if err != nil { return nil, err @@ -686,10 +720,6 @@ func (js *js) UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error defer cancel() } - if cfg == nil || cfg.Name == _EMPTY_ { - return nil, ErrStreamNameRequired - } - req, err := json.Marshal(cfg) if err != nil { return nil, err @@ -718,6 +748,9 @@ type streamDeleteResponse struct { // DeleteStream deletes a Stream. func (js *js) DeleteStream(name string, opts ...JSOpt) error { + if err := checkStreamName(name); err != nil { + return err + } o, cancel, err := getJSContextOpts(js.opts, opts...) if err != nil { return err @@ -726,10 +759,6 @@ func (js *js) DeleteStream(name string, opts ...JSOpt) error { defer cancel() } - if name == _EMPTY_ { - return ErrStreamNameRequired - } - dsSubj := js.apiSubj(fmt.Sprintf(apiStreamDeleteT, name)) r, err := js.apiRequestWithContext(o.ctx, dsSubj, nil) if err != nil { @@ -905,6 +934,9 @@ type streamPurgeResponse struct { // PurgeStream purges messages on a Stream. func (js *js) PurgeStream(stream string, opts ...JSOpt) error { + if err := checkStreamName(stream); err != nil { + return err + } return js.purgeStream(stream, nil) } diff --git a/nats.go b/nats.go index 146eff8e1..8387087c6 100644 --- a/nats.go +++ b/nats.go @@ -145,11 +145,13 @@ var ( ErrNotJSMessage = errors.New("nats: not a jetstream message") ErrInvalidStreamName = errors.New("nats: invalid stream name") ErrInvalidDurableName = errors.New("nats: invalid durable name") + ErrInvalidConsumerName = errors.New("nats: invalid consumer name") ErrNoMatchingStream = errors.New("nats: no stream matches subject") ErrSubjectMismatch = errors.New("nats: subject does not match consumer") ErrContextAndTimeout = errors.New("nats: context and timeout can not both be set") ErrInvalidJSAck = errors.New("nats: invalid jetstream publish response") ErrMultiStreamUnsupported = errors.New("nats: multiple streams are not supported") + ErrStreamConfigRequired = errors.New("nats: stream configuration is required") ErrStreamNameRequired = errors.New("nats: stream name is required") ErrStreamNotFound = errors.New("nats: stream not found") ErrConsumerNotFound = errors.New("nats: consumer not found") diff --git a/test/js_test.go b/test/js_test.go index 55cb57883..43554efbb 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -1123,9 +1123,6 @@ func TestJetStreamManagement(t *testing.T) { // Create the stream using our client API. var si *nats.StreamInfo t.Run("create stream", func(t *testing.T) { - if _, err := js.AddStream(nil); err == nil { - t.Fatalf("Unexpected success") - } si, err := js.AddStream(&nats.StreamConfig{Name: "foo"}) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -1161,22 +1158,35 @@ func TestJetStreamManagement(t *testing.T) { }) t.Run("create bad stream", func(t *testing.T) { - _, err := js.AddStream(&nats.StreamConfig{Name: "foo.invalid"}) - if err != nats.ErrInvalidStreamName { - t.Fatalf("Unexpected error: %v", err) + if _, err := js.AddStream(nil); err != nats.ErrStreamConfigRequired { + t.Fatalf("Expected %v, got %v", nats.ErrStreamConfigRequired, err) + } + if _, err := js.AddStream(&nats.StreamConfig{Name: ""}); err != nats.ErrStreamNameRequired { + t.Fatalf("Expected %v, got %v", nats.ErrStreamNameRequired, err) + } + if _, err := js.AddStream(&nats.StreamConfig{Name: "bad.stream.name"}); err != nats.ErrInvalidStreamName { + t.Fatalf("Expected %v, got %v", nats.ErrInvalidStreamName, err) } }) t.Run("bad stream info", func(t *testing.T) { - _, err := js.StreamInfo("foo.invalid") - if err != nats.ErrInvalidStreamName { - t.Fatalf("Unexpected error: %v", err) + if _, err := js.StreamInfo(""); err != nats.ErrStreamNameRequired { + t.Fatalf("Expected %v, got %v", nats.ErrStreamNameRequired, err) + } + if _, err := js.StreamInfo("bad.stream.name"); err != nats.ErrInvalidStreamName { + t.Fatalf("Expected %v, got %v", nats.ErrInvalidStreamName, err) } }) t.Run("stream update", func(t *testing.T) { - if _, err := js.UpdateStream(nil); err == nil { - t.Fatal("Unexpected success") + if _, err := js.UpdateStream(nil); err != nats.ErrStreamConfigRequired { + t.Fatalf("Expected %v, got %v", nats.ErrStreamConfigRequired, err) + } + if _, err := js.UpdateStream(&nats.StreamConfig{Name: ""}); err != nats.ErrStreamNameRequired { + t.Fatalf("Expected %v, got %v", nats.ErrStreamNameRequired, err) + } + if _, err := js.UpdateStream(&nats.StreamConfig{Name: "bad.stream.name"}); err != nats.ErrInvalidStreamName { + t.Fatalf("Expected %v, got %v", nats.ErrInvalidStreamName, err) } prevMaxMsgs := si.Config.MaxMsgs si, err = js.UpdateStream(&nats.StreamConfig{Name: "foo", MaxMsgs: prevMaxMsgs + 100}) @@ -1209,7 +1219,34 @@ func TestJetStreamManagement(t *testing.T) { } }) + t.Run("create consumer check params", func(t *testing.T) { + _, err = js.AddConsumer("", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy}) + if err != nats.ErrStreamNameRequired { + t.Fatalf("Expected %v, got: %v", nats.ErrStreamNameRequired, err) + } + _, err = js.AddConsumer("bad.stream.name", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy}) + if err != nats.ErrInvalidStreamName { + t.Fatalf("Expected %v, got: %v", nats.ErrInvalidStreamName, err) + } + _, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "bad.consumer.name", AckPolicy: nats.AckExplicitPolicy}) + if err != nats.ErrInvalidDurableName { + t.Fatalf("Expected %v, got: %v", nats.ErrInvalidDurableName, err) + } + }) + t.Run("consumer info", func(t *testing.T) { + if _, err := js.ConsumerInfo("", "dlc"); err != nats.ErrStreamNameRequired { + t.Fatalf("Expected %v, got %v", nats.ErrStreamNameRequired, err) + } + if _, err := js.ConsumerInfo("bad.stream.name", "dlc"); err != nats.ErrInvalidStreamName { + t.Fatalf("Expected %v, got %v", nats.ErrInvalidStreamName, err) + } + if _, err := js.ConsumerInfo("foo", ""); err != nats.ErrConsumerNameRequired { + t.Fatalf("Expected %v, got %v", nats.ErrConsumerNameRequired, err) + } + if _, err := js.ConsumerInfo("foo", "bad.consumer.name"); err != nats.ErrInvalidConsumerName { + t.Fatalf("Expected %v, got %v", nats.ErrInvalidConsumerName, err) + } ci, err := js.ConsumerInfo("foo", "dlc") if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -1247,7 +1284,12 @@ func TestJetStreamManagement(t *testing.T) { if len(infos) != 0 { t.Fatalf("ConsumerInfo is not correct %+v", infos) } - + for info := range js.ConsumersInfo("bad.stream.name") { + infos = append(infos, info) + } + if len(infos) != 0 { + t.Fatalf("ConsumerInfo is not correct %+v", infos) + } infos = infos[:0] for info := range js.ConsumersInfo("foo") { infos = append(infos, info) @@ -1270,8 +1312,17 @@ func TestJetStreamManagement(t *testing.T) { }) t.Run("delete consumers", func(t *testing.T) { - if err := js.DeleteConsumer("", ""); err == nil { - t.Fatalf("Unexpected success") + if err := js.DeleteConsumer("", "dlc"); err != nats.ErrStreamNameRequired { + t.Fatalf("Expected %v, got %v", nats.ErrStreamNameRequired, err) + } + if err := js.DeleteConsumer("bad.stream.name", "dlc"); err != nats.ErrInvalidStreamName { + t.Fatalf("Expected %v, got %v", nats.ErrInvalidStreamName, err) + } + if err := js.DeleteConsumer("foo", ""); err != nats.ErrConsumerNameRequired { + t.Fatalf("Expected %v, got %v", nats.ErrConsumerNameRequired, err) + } + if err := js.DeleteConsumer("foo", "bad.consumer.name"); err != nats.ErrInvalidConsumerName { + t.Fatalf("Expected %v, got %v", nats.ErrInvalidConsumerName, err) } if err := js.DeleteConsumer("foo", "dlc"); err != nil { t.Fatalf("Unexpected error: %v", err) @@ -1302,12 +1353,23 @@ func TestJetStreamManagement(t *testing.T) { if err != nats.ErrStreamNameRequired { t.Fatalf("Expected stream name required error, got %v", err) } + // Check that stream name is valid + _, err = js.UpdateConsumer("bad.stream.name", &expected) + if err != nats.ErrInvalidStreamName { + t.Fatalf("Expected stream name required error, got %v", err) + } // Check that durable name is required expected.Durable = "" _, err = js.UpdateConsumer("foo", &expected) if err != nats.ErrInvalidDurableName { t.Fatalf("Expected consumer name required error, got %v", err) } + // Check that durable name is valid + expected.Durable = "bad.consumer.name" + _, err = js.UpdateConsumer("foo", &expected) + if err != nats.ErrInvalidDurableName { + t.Fatalf("Expected consumer name required error, got %v", err) + } expected.Durable = "update_push_consumer" // Check that configuration is required @@ -1356,6 +1418,12 @@ func TestJetStreamManagement(t *testing.T) { }) t.Run("purge stream", func(t *testing.T) { + if err := js.PurgeStream(""); err != nats.ErrStreamNameRequired { + t.Fatalf("Expected %v, got %v", nats.ErrStreamNameRequired, err) + } + if err := js.PurgeStream("bad.stream.name"); err != nats.ErrInvalidStreamName { + t.Fatalf("Expected %v, got %v", nats.ErrInvalidStreamName, err) + } if err := js.PurgeStream("foo"); err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1379,8 +1447,11 @@ func TestJetStreamManagement(t *testing.T) { }) t.Run("delete stream", func(t *testing.T) { - if err := js.DeleteStream(""); err == nil { - t.Fatal("Unexpected success") + if err := js.DeleteStream(""); err != nats.ErrStreamNameRequired { + t.Fatalf("Expected %v, got %v", nats.ErrStreamNameRequired, err) + } + if err := js.DeleteStream("bad.stream.name"); err != nats.ErrInvalidStreamName { + t.Fatalf("Expected %v, got %v", nats.ErrInvalidStreamName, err) } if err := js.DeleteStream("foo"); err != nil { t.Fatalf("Unexpected error: %v", err)