From 7c6835cc811f73e8d2ee5fe74f7619618594695d Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Mon, 4 Apr 2022 12:31:08 -0600 Subject: [PATCH] [FIXED] JetStream: check consumer name validity in APIs For ConsumerInfo and DeleteConsumer, we need to check that the consumer name is valid, that is, does not contains ".", otherwise the request would be dropped by older servers. But even with servers returning "bad request", it is not meaningful enough, so the library should inform the user if the name is invalid. Signed-off-by: Ivan Kozlovic --- jsm.go | 22 ++++++++++++++++++---- nats.go | 1 + test/js_test.go | 23 +++++++++++++++++++++++ 3 files changed, 42 insertions(+), 4 deletions(-) diff --git a/jsm.go b/jsm.go index 11545f318..ddf34a9b7 100644 --- a/jsm.go +++ b/jsm.go @@ -295,6 +295,19 @@ type consumerDeleteResponse struct { Success bool `json:"success,omitempty"` } +func checkStreamAndConsumerName(stream, consumer string) error { + if stream == _EMPTY_ { + return ErrStreamNameRequired + } + if consumer == _EMPTY_ { + return ErrConsumerNameRequired + } + if strings.Contains(consumer, ".") { + return ErrInvalidConsumerName + } + return nil +} + // DeleteConsumer deletes a Consumer. func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error { o, cancel, err := getJSContextOpts(js.opts, opts...) @@ -304,11 +317,9 @@ func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error { if cancel != nil { defer cancel() } - - if stream == _EMPTY_ { - return ErrStreamNameRequired + if err := checkStreamAndConsumerName(stream, consumer); err != nil { + return err } - dcSubj := js.apiSubj(fmt.Sprintf(apiConsumerDeleteT, stream, consumer)) r, err := js.apiRequestWithContext(o.ctx, dcSubj, nil) if err != nil { @@ -337,6 +348,9 @@ func (js *js) ConsumerInfo(stream, consumer string, opts ...JSOpt) (*ConsumerInf if cancel != nil { defer cancel() } + if err := checkStreamAndConsumerName(stream, consumer); err != nil { + return nil, err + } return js.getConsumerInfoContext(o.ctx, stream, consumer) } diff --git a/nats.go b/nats.go index 146eff8e1..c465f5333 100644 --- a/nats.go +++ b/nats.go @@ -145,6 +145,7 @@ var ( ErrNotJSMessage = errors.New("nats: not a jetstream message") ErrInvalidStreamName = errors.New("nats: invalid stream name") ErrInvalidDurableName = errors.New("nats: invalid durable name") + ErrInvalidConsumerName = errors.New("nats: invalid consumer name") ErrNoMatchingStream = errors.New("nats: no stream matches subject") ErrSubjectMismatch = errors.New("nats: subject does not match consumer") ErrContextAndTimeout = errors.New("nats: context and timeout can not both be set") diff --git a/test/js_test.go b/test/js_test.go index 55cb57883..223b2b646 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -786,6 +786,29 @@ func TestJetStreamSubscribe(t *testing.T) { if ci, err := js.ConsumerInfo("TEST", name); err == nil { t.Fatalf("Expected no consumer to exist, got %+v", ci) } + + // Check that stream name is required + if _, err := js.ConsumerInfo("", name); err != nats.ErrStreamNameRequired { + t.Fatalf("Expected error %v, got %v", nats.ErrStreamNameRequired, err) + } + // Check that consumer name is required + if _, err := js.ConsumerInfo("TEST", ""); err != nats.ErrConsumerNameRequired { + t.Fatalf("Expected error %v, got %v", nats.ErrConsumerNameRequired, err) + } + // Check that we are checking that the consumer name is valid (no ".") + if _, err := js.ConsumerInfo("TEST", "bad.consumer.name"); err != nats.ErrInvalidConsumerName { + t.Fatalf("Expected error %v, got %v", nats.ErrInvalidConsumerName, err) + } + // Same for delete consumer + if err := js.DeleteConsumer("", name); err != nats.ErrStreamNameRequired { + t.Fatalf("Expected error %v, got %v", nats.ErrStreamNameRequired, err) + } + if err := js.DeleteConsumer("TEST", ""); err != nats.ErrConsumerNameRequired { + t.Fatalf("Expected error %v, got %v", nats.ErrConsumerNameRequired, err) + } + if err := js.DeleteConsumer("TEST", "bad.consumer.name"); err != nats.ErrInvalidConsumerName { + t.Fatalf("Expected error %v, got %v", nats.ErrInvalidConsumerName, err) + } } func TestJetStreamAckPending_Pull(t *testing.T) {