Skip to content

Commit

Permalink
Fix consumer info if consumer was closed
Browse files Browse the repository at this point in the history
Co-authored-by: Derek Collison <derek@nats.io>
Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
Jarema and derekcollison committed Sep 29, 2023
1 parent 15b4611 commit fd65d43
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 2 deletions.
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -18,5 +18,6 @@ require (

require (
github.com/golang/protobuf v1.4.2 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/protobuf v1.23.0 // indirect
)
4 changes: 4 additions & 0 deletions go.sum
Expand Up @@ -17,6 +17,8 @@ github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU=
github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/nats.go v1.30.2 h1:aloM0TGpPorZKQhbAkdCzYDj+ZmsJDyeo3Gkbr72NuY=
github.com/nats-io/nats.go v1.30.2/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM=
github.com/nats-io/nats.go v1.30.2 h1:aloM0TGpPorZKQhbAkdCzYDj+ZmsJDyeo3Gkbr72NuY=
github.com/nats-io/nats.go v1.30.2/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM=
github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk=
github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand All @@ -33,6 +35,8 @@ golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqR
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
2 changes: 1 addition & 1 deletion server/consumer.go
Expand Up @@ -2493,7 +2493,7 @@ func (o *consumer) infoWithSnap(snap bool) *ConsumerInfo {
func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo {
o.mu.Lock()
mset := o.mset
if mset == nil || mset.srv == nil {
if o.closed || mset == nil || mset.srv == nil {
o.mu.Unlock()
return nil
}
Expand Down
8 changes: 7 additions & 1 deletion server/jetstream_api.go
Expand Up @@ -4286,7 +4286,13 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
resp.ConsumerInfo = obs.info()

if resp.ConsumerInfo = obs.info(); resp.ConsumerInfo == nil {
// This consumer returned nil which means it's closed. Respond with not found.
resp.Error = NewJSConsumerNotFoundError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
}

Expand Down
69 changes: 69 additions & 0 deletions server/jetstream_consumer_test.go
Expand Up @@ -17,7 +17,9 @@
package server

import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
"sort"
Expand All @@ -27,6 +29,7 @@ import (
"time"

"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)

func TestJetStreamConsumerMultipleFiltersRemoveFilters(t *testing.T) {
Expand Down Expand Up @@ -670,3 +673,69 @@ func TestJetStreamConsumerActionsUnmarshal(t *testing.T) {
})
}
}

// This test was added as in some cases consumer info was returning nil,
// causing Go client to panic.
func TestJetstreamConsumerInfoReturnNil(t *testing.T) {
// Skip this test, as it takes almost a minute to run.
// TODO(jrm): Figure out a way for running long running tests
// Maybe a build tag for slow tests run nightly in CI is
// a good solution.
t.Skip()

server := RunBasicJetStreamServer(t)
defer server.Shutdown()
nc, _ := jsClientConnect(t, server)
defer nc.Close()

js, err := jetstream.New(nc)
require_NoError(t, err)

s, err := js.CreateStream(context.Background(), jetstream.StreamConfig{
Name: "WQ-STREAM",
Retention: jetstream.WorkQueuePolicy,
Subjects: []string{"WQ.*"},
})
require_NoError(t, err)

// Prepare a list of 500 consumer names.
consumers := make([]string, 0)
for i := 0; i < 500; i++ {
consumers = append(consumers, fmt.Sprintf("WQ-CONSUMER-%d", i))
}

// Run the initial create for each, with small InactiveThreshold.
for i, consumer := range consumers {
_, err = s.CreateOrUpdateConsumer(context.Background(), jetstream.ConsumerConfig{
Name: consumer,
InactiveThreshold: 5 * time.Second,
FilterSubject: fmt.Sprintf("WQ.%d", i),
})
require_NoError(t, err)
}

// Call info on random one.
// If it was already deleted, recreate it.
for i := 0; i < 1_000_000; i++ {
consNum := rand.Intn(500)
consName := consumers[consNum]
// This calls consumer info and once in a while panics, because server returns nil
// instead of consumer info.
c, err := s.Consumer(context.Background(), consName)
if err != nil {
if errors.Is(err, jetstream.ErrConsumerNotFound) {
_, err = s.CreateOrUpdateConsumer(context.Background(), jetstream.ConsumerConfig{
Name: consName,
InactiveThreshold: 5 * time.Second,
FilterSubject: fmt.Sprintf("WQ.%d", consNum),
})
if err != nil {
continue
}
continue
}
}
cfg := c.CachedInfo().Config
_ = cfg.Name
}
}

0 comments on commit fd65d43

Please sign in to comment.