Skip to content

Commit

Permalink
Fix paging in stream and consumer names listing (#1060)
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrpio committed Aug 31, 2022
1 parent 6de9cf0 commit 5af7980
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 4 deletions.
22 changes: 18 additions & 4 deletions jsm.go
Expand Up @@ -547,7 +547,7 @@ type consumerNamesListResponse struct {
Consumers []string `json:"consumers"`
}

// Next fetches the next ConsumerInfo page.
// Next fetches the next consumer names page.
func (c *consumerNamesLister) Next() bool {
if c.err != nil {
return false
Expand All @@ -567,8 +567,15 @@ func (c *consumerNamesLister) Next() bool {
defer cancel()
}

req, err := json.Marshal(consumersRequest{
apiPagedRequest: apiPagedRequest{Offset: c.offset},
})
if err != nil {
c.err = err
return false
}
clSubj := c.js.apiSubj(fmt.Sprintf(apiConsumerNamesT, c.stream))
r, err := c.js.apiRequestWithContext(ctx, clSubj, nil)
r, err := c.js.apiRequestWithContext(ctx, clSubj, req)
if err != nil {
c.err = err
return false
Expand Down Expand Up @@ -1286,7 +1293,7 @@ type streamNamesLister struct {
pageInfo *apiPaged
}

// Next fetches the next ConsumerInfo page.
// Next fetches the next stream names page.
func (l *streamNamesLister) Next() bool {
if l.err != nil {
return false
Expand All @@ -1302,7 +1309,14 @@ func (l *streamNamesLister) Next() bool {
defer cancel()
}

r, err := l.js.apiRequestWithContext(ctx, l.js.apiSubj(apiStreams), nil)
req, err := json.Marshal(streamNamesRequest{
apiPagedRequest: apiPagedRequest{Offset: l.offset},
})
if err != nil {
l.err = err
return false
}
r, err := l.js.apiRequestWithContext(ctx, l.js.apiSubj(apiStreams), req)
if err != nil {
l.err = err
return false
Expand Down
91 changes: 91 additions & 0 deletions test/js_test.go
Expand Up @@ -1647,6 +1647,97 @@ func TestJetStreamManagement(t *testing.T) {
})
}

func TestStreamLister(t *testing.T) {
tests := []struct {
name string
streamsNum int
}{
{
name: "single page",
streamsNum: 5,
},
{
name: "multi page",
streamsNum: 1025,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()
for i := 0; i < test.streamsNum; i++ {
if _, err := js.AddStream(&nats.StreamConfig{Name: fmt.Sprintf("stream_%d", i)}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
names := make([]string, 0)
for name := range js.StreamNames() {
names = append(names, name)
}
if len(names) != test.streamsNum {
t.Fatalf("Invalid number of stream names; want: %d; got: %d", test.streamsNum, len(names))
}
infos := make([]*nats.StreamInfo, 0)
for info := range js.StreamsInfo() {
infos = append(infos, info)
}
if len(infos) != test.streamsNum {
t.Fatalf("Invalid number of streams; want: %d; got: %d", test.streamsNum, len(infos))
}
})
}
}

func TestConsumersLister(t *testing.T) {
tests := []struct {
name string
consumersNum int
}{
{
name: "single page",
consumersNum: 5,
},
{
name: "multi page",
consumersNum: 1025,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()
js.AddStream(&nats.StreamConfig{Name: "foo"})
for i := 0; i < test.consumersNum; i++ {
if _, err := js.AddConsumer("foo", &nats.ConsumerConfig{Durable: fmt.Sprintf("cons_%d", i), AckPolicy: nats.AckExplicitPolicy}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
names := make([]string, 0)
for name := range js.ConsumerNames("foo") {
names = append(names, name)
}
if len(names) != test.consumersNum {
t.Fatalf("Invalid number of consumer names; want: %d; got: %d", test.consumersNum, len(names))
}
infos := make([]*nats.ConsumerInfo, 0)
for info := range js.ConsumersInfo("foo") {
infos = append(infos, info)
}
if len(infos) != test.consumersNum {
t.Fatalf("Invalid number of consumers; want: %d; got: %d", test.consumersNum, len(infos))
}
})
}
}

func TestAccountInfo(t *testing.T) {
tests := []struct {
name string
Expand Down

0 comments on commit 5af7980

Please sign in to comment.