diff --git a/jsm.go b/jsm.go index c04308315..ecea0c14c 100644 --- a/jsm.go +++ b/jsm.go @@ -547,7 +547,7 @@ type consumerNamesListResponse struct { Consumers []string `json:"consumers"` } -// Next fetches the next ConsumerInfo page. +// Next fetches the next consumer names page. func (c *consumerNamesLister) Next() bool { if c.err != nil { return false @@ -567,8 +567,15 @@ func (c *consumerNamesLister) Next() bool { defer cancel() } + req, err := json.Marshal(consumersRequest{ + apiPagedRequest: apiPagedRequest{Offset: c.offset}, + }) + if err != nil { + c.err = err + return false + } clSubj := c.js.apiSubj(fmt.Sprintf(apiConsumerNamesT, c.stream)) - r, err := c.js.apiRequestWithContext(ctx, clSubj, nil) + r, err := c.js.apiRequestWithContext(ctx, clSubj, req) if err != nil { c.err = err return false @@ -1286,7 +1293,7 @@ type streamNamesLister struct { pageInfo *apiPaged } -// Next fetches the next ConsumerInfo page. +// Next fetches the next stream names page. func (l *streamNamesLister) Next() bool { if l.err != nil { return false @@ -1302,7 +1309,14 @@ func (l *streamNamesLister) Next() bool { defer cancel() } - r, err := l.js.apiRequestWithContext(ctx, l.js.apiSubj(apiStreams), nil) + req, err := json.Marshal(streamNamesRequest{ + apiPagedRequest: apiPagedRequest{Offset: l.offset}, + }) + if err != nil { + l.err = err + return false + } + r, err := l.js.apiRequestWithContext(ctx, l.js.apiSubj(apiStreams), req) if err != nil { l.err = err return false diff --git a/test/js_test.go b/test/js_test.go index 46f644077..3327b1590 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -1647,6 +1647,97 @@ func TestJetStreamManagement(t *testing.T) { }) } +func TestStreamLister(t *testing.T) { + tests := []struct { + name string + streamsNum int + }{ + { + name: "single page", + streamsNum: 5, + }, + { + name: "multi page", + streamsNum: 1025, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + for i := 0; i < test.streamsNum; i++ { + if _, err := js.AddStream(&nats.StreamConfig{Name: fmt.Sprintf("stream_%d", i)}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + } + names := make([]string, 0) + for name := range js.StreamNames() { + names = append(names, name) + } + if len(names) != test.streamsNum { + t.Fatalf("Invalid number of stream names; want: %d; got: %d", test.streamsNum, len(names)) + } + infos := make([]*nats.StreamInfo, 0) + for info := range js.StreamsInfo() { + infos = append(infos, info) + } + if len(infos) != test.streamsNum { + t.Fatalf("Invalid number of streams; want: %d; got: %d", test.streamsNum, len(infos)) + } + }) + } +} + +func TestConsumersLister(t *testing.T) { + tests := []struct { + name string + consumersNum int + }{ + { + name: "single page", + consumersNum: 5, + }, + { + name: "multi page", + consumersNum: 1025, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + js.AddStream(&nats.StreamConfig{Name: "foo"}) + for i := 0; i < test.consumersNum; i++ { + if _, err := js.AddConsumer("foo", &nats.ConsumerConfig{Durable: fmt.Sprintf("cons_%d", i), AckPolicy: nats.AckExplicitPolicy}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + } + names := make([]string, 0) + for name := range js.ConsumerNames("foo") { + names = append(names, name) + } + if len(names) != test.consumersNum { + t.Fatalf("Invalid number of consumer names; want: %d; got: %d", test.consumersNum, len(names)) + } + infos := make([]*nats.ConsumerInfo, 0) + for info := range js.ConsumersInfo("foo") { + infos = append(infos, info) + } + if len(infos) != test.consumersNum { + t.Fatalf("Invalid number of consumers; want: %d; got: %d", test.consumersNum, len(infos)) + } + }) + } +} + func TestAccountInfo(t *testing.T) { tests := []struct { name string