Skip to content

Commit

Permalink
[FIXED] JetStream: check stream and consumer names
Browse files Browse the repository at this point in the history
Many APIs were not properly checking that the stream and/or
consumer names were provided and valid (no "." in them), which
could lead to requests being sent on a subject that the server
has no interest on and no response was therefore sent to the
client library, which would fail with a timeout/context deadline.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Apr 4, 2022
1 parent e076b0d commit 3e2512a
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 47 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Expand Up @@ -7,7 +7,7 @@ install:
- go get -t ./...
- go get github.com/mattn/goveralls
- go get github.com/wadey/gocovmerge
- go get -u honnef.co/go/tools/cmd/staticcheck
- go install honnef.co/go/tools/cmd/staticcheck@v0.2.2
- go get -u github.com/client9/misspell/cmd/misspell
before_script:
- $(exit $(go fmt ./... | wc -l))
Expand Down
92 changes: 62 additions & 30 deletions jsm.go
Expand Up @@ -229,6 +229,9 @@ type consumerResponse struct {

// AddConsumer will add a JetStream consumer.
func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
if err := checkStreamName(stream); err != nil {
return nil, err
}
o, cancel, err := getJSContextOpts(js.opts, opts...)
if err != nil {
return nil, err
Expand All @@ -237,9 +240,6 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C
defer cancel()
}

if stream == _EMPTY_ {
return nil, ErrStreamNameRequired
}
req, err := json.Marshal(&createConsumerRequest{Stream: stream, Config: cfg})
if err != nil {
return nil, err
Expand Down Expand Up @@ -280,6 +280,9 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C
}

func (js *js) UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
if err := checkStreamName(stream); err != nil {
return nil, err
}
if cfg == nil {
return nil, ErrConsumerConfigRequired
}
Expand All @@ -295,8 +298,34 @@ type consumerDeleteResponse struct {
Success bool `json:"success,omitempty"`
}

func checkStreamName(stream string) error {
if stream == _EMPTY_ {
return ErrStreamNameRequired
}
if strings.Contains(stream, ".") {
return ErrInvalidStreamName
}
return nil
}

func checkConsumerName(consumer string) error {
if consumer == _EMPTY_ {
return ErrConsumerNameRequired
}
if strings.Contains(consumer, ".") {
return ErrInvalidConsumerName
}
return nil
}

// DeleteConsumer deletes a Consumer.
func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error {
if err := checkStreamName(stream); err != nil {
return err
}
if err := checkConsumerName(consumer); err != nil {
return err
}
o, cancel, err := getJSContextOpts(js.opts, opts...)
if err != nil {
return err
Expand All @@ -305,10 +334,6 @@ func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error {
defer cancel()
}

if stream == _EMPTY_ {
return ErrStreamNameRequired
}

dcSubj := js.apiSubj(fmt.Sprintf(apiConsumerDeleteT, stream, consumer))
r, err := js.apiRequestWithContext(o.ctx, dcSubj, nil)
if err != nil {
Expand All @@ -330,6 +355,12 @@ func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error {

// ConsumerInfo returns information about a Consumer.
func (js *js) ConsumerInfo(stream, consumer string, opts ...JSOpt) (*ConsumerInfo, error) {
if err := checkStreamName(stream); err != nil {
return nil, err
}
if err := checkConsumerName(consumer); err != nil {
return nil, err
}
o, cancel, err := getJSContextOpts(js.opts, opts...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -369,8 +400,8 @@ func (c *consumerLister) Next() bool {
if c.err != nil {
return false
}
if c.stream == _EMPTY_ {
c.err = ErrStreamNameRequired
if err := checkStreamName(c.stream); err != nil {
c.err = err
return false
}
if c.pageInfo != nil && c.offset >= c.pageInfo.Total {
Expand Down Expand Up @@ -474,8 +505,8 @@ func (c *consumerNamesLister) Next() bool {
if c.err != nil {
return false
}
if c.stream == _EMPTY_ {
c.err = ErrStreamNameRequired
if err := checkStreamName(c.stream); err != nil {
c.err = err
return false
}
if c.pageInfo != nil && c.offset >= c.pageInfo.Total {
Expand Down Expand Up @@ -556,6 +587,12 @@ type streamCreateResponse struct {
}

func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
if cfg == nil {
return nil, ErrStreamConfigRequired
}
if err := checkStreamName(cfg.Name); err != nil {
return nil, err
}
o, cancel, err := getJSContextOpts(js.opts, opts...)
if err != nil {
return nil, err
Expand All @@ -564,14 +601,6 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
defer cancel()
}

if cfg == nil || cfg.Name == _EMPTY_ {
return nil, ErrStreamNameRequired
}

if strings.Contains(cfg.Name, ".") {
return nil, ErrInvalidStreamName
}

req, err := json.Marshal(cfg)
if err != nil {
return nil, err
Expand Down Expand Up @@ -599,10 +628,9 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
type streamInfoResponse = streamCreateResponse

func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) {
if strings.Contains(stream, ".") {
return nil, ErrInvalidStreamName
if err := checkStreamName(stream); err != nil {
return nil, err
}

o, cancel, err := getJSContextOpts(js.opts, opts...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -678,6 +706,12 @@ type PeerInfo struct {

// UpdateStream updates a Stream.
func (js *js) UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
if cfg == nil {
return nil, ErrStreamConfigRequired
}
if err := checkStreamName(cfg.Name); err != nil {
return nil, err
}
o, cancel, err := getJSContextOpts(js.opts, opts...)
if err != nil {
return nil, err
Expand All @@ -686,10 +720,6 @@ func (js *js) UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error
defer cancel()
}

if cfg == nil || cfg.Name == _EMPTY_ {
return nil, ErrStreamNameRequired
}

req, err := json.Marshal(cfg)
if err != nil {
return nil, err
Expand Down Expand Up @@ -718,6 +748,9 @@ type streamDeleteResponse struct {

// DeleteStream deletes a Stream.
func (js *js) DeleteStream(name string, opts ...JSOpt) error {
if err := checkStreamName(name); err != nil {
return err
}
o, cancel, err := getJSContextOpts(js.opts, opts...)
if err != nil {
return err
Expand All @@ -726,10 +759,6 @@ func (js *js) DeleteStream(name string, opts ...JSOpt) error {
defer cancel()
}

if name == _EMPTY_ {
return ErrStreamNameRequired
}

dsSubj := js.apiSubj(fmt.Sprintf(apiStreamDeleteT, name))
r, err := js.apiRequestWithContext(o.ctx, dsSubj, nil)
if err != nil {
Expand Down Expand Up @@ -905,6 +934,9 @@ type streamPurgeResponse struct {

// PurgeStream purges messages on a Stream.
func (js *js) PurgeStream(stream string, opts ...JSOpt) error {
if err := checkStreamName(stream); err != nil {
return err
}
return js.purgeStream(stream, nil)
}

Expand Down
2 changes: 2 additions & 0 deletions nats.go
Expand Up @@ -145,11 +145,13 @@ var (
ErrNotJSMessage = errors.New("nats: not a jetstream message")
ErrInvalidStreamName = errors.New("nats: invalid stream name")
ErrInvalidDurableName = errors.New("nats: invalid durable name")
ErrInvalidConsumerName = errors.New("nats: invalid consumer name")
ErrNoMatchingStream = errors.New("nats: no stream matches subject")
ErrSubjectMismatch = errors.New("nats: subject does not match consumer")
ErrContextAndTimeout = errors.New("nats: context and timeout can not both be set")
ErrInvalidJSAck = errors.New("nats: invalid jetstream publish response")
ErrMultiStreamUnsupported = errors.New("nats: multiple streams are not supported")
ErrStreamConfigRequired = errors.New("nats: stream configuration is required")
ErrStreamNameRequired = errors.New("nats: stream name is required")
ErrStreamNotFound = errors.New("nats: stream not found")
ErrConsumerNotFound = errors.New("nats: consumer not found")
Expand Down

0 comments on commit 3e2512a

Please sign in to comment.