Skip to content

Commit

Permalink
[FIXED] JetStream: check consumer name validity in APIs
Browse files Browse the repository at this point in the history
For ConsumerInfo and DeleteConsumer, we need to check that the
consumer name is valid, that is, does not contains ".", otherwise
the request would be dropped by older servers. But even with
servers returning "bad request", it is not meaningful enough, so
the library should inform the user if the name is invalid.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Apr 4, 2022
1 parent e076b0d commit 7c6835c
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 4 deletions.
22 changes: 18 additions & 4 deletions jsm.go
Expand Up @@ -295,6 +295,19 @@ type consumerDeleteResponse struct {
Success bool `json:"success,omitempty"`
}

func checkStreamAndConsumerName(stream, consumer string) error {
if stream == _EMPTY_ {
return ErrStreamNameRequired
}
if consumer == _EMPTY_ {
return ErrConsumerNameRequired
}
if strings.Contains(consumer, ".") {
return ErrInvalidConsumerName
}
return nil
}

// DeleteConsumer deletes a Consumer.
func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error {
o, cancel, err := getJSContextOpts(js.opts, opts...)
Expand All @@ -304,11 +317,9 @@ func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error {
if cancel != nil {
defer cancel()
}

if stream == _EMPTY_ {
return ErrStreamNameRequired
if err := checkStreamAndConsumerName(stream, consumer); err != nil {
return err
}

dcSubj := js.apiSubj(fmt.Sprintf(apiConsumerDeleteT, stream, consumer))
r, err := js.apiRequestWithContext(o.ctx, dcSubj, nil)
if err != nil {
Expand Down Expand Up @@ -337,6 +348,9 @@ func (js *js) ConsumerInfo(stream, consumer string, opts ...JSOpt) (*ConsumerInf
if cancel != nil {
defer cancel()
}
if err := checkStreamAndConsumerName(stream, consumer); err != nil {
return nil, err
}
return js.getConsumerInfoContext(o.ctx, stream, consumer)
}

Expand Down
1 change: 1 addition & 0 deletions nats.go
Expand Up @@ -145,6 +145,7 @@ var (
ErrNotJSMessage = errors.New("nats: not a jetstream message")
ErrInvalidStreamName = errors.New("nats: invalid stream name")
ErrInvalidDurableName = errors.New("nats: invalid durable name")
ErrInvalidConsumerName = errors.New("nats: invalid consumer name")
ErrNoMatchingStream = errors.New("nats: no stream matches subject")
ErrSubjectMismatch = errors.New("nats: subject does not match consumer")
ErrContextAndTimeout = errors.New("nats: context and timeout can not both be set")
Expand Down
23 changes: 23 additions & 0 deletions test/js_test.go
Expand Up @@ -786,6 +786,29 @@ func TestJetStreamSubscribe(t *testing.T) {
if ci, err := js.ConsumerInfo("TEST", name); err == nil {
t.Fatalf("Expected no consumer to exist, got %+v", ci)
}

// Check that stream name is required
if _, err := js.ConsumerInfo("", name); err != nats.ErrStreamNameRequired {
t.Fatalf("Expected error %v, got %v", nats.ErrStreamNameRequired, err)
}
// Check that consumer name is required
if _, err := js.ConsumerInfo("TEST", ""); err != nats.ErrConsumerNameRequired {
t.Fatalf("Expected error %v, got %v", nats.ErrConsumerNameRequired, err)
}
// Check that we are checking that the consumer name is valid (no ".")
if _, err := js.ConsumerInfo("TEST", "bad.consumer.name"); err != nats.ErrInvalidConsumerName {
t.Fatalf("Expected error %v, got %v", nats.ErrInvalidConsumerName, err)
}
// Same for delete consumer
if err := js.DeleteConsumer("", name); err != nats.ErrStreamNameRequired {
t.Fatalf("Expected error %v, got %v", nats.ErrStreamNameRequired, err)
}
if err := js.DeleteConsumer("TEST", ""); err != nats.ErrConsumerNameRequired {
t.Fatalf("Expected error %v, got %v", nats.ErrConsumerNameRequired, err)
}
if err := js.DeleteConsumer("TEST", "bad.consumer.name"); err != nats.ErrInvalidConsumerName {
t.Fatalf("Expected error %v, got %v", nats.ErrInvalidConsumerName, err)
}
}

func TestJetStreamAckPending_Pull(t *testing.T) {
Expand Down

0 comments on commit 7c6835c

Please sign in to comment.