Skip to content

Commit

Permalink
Adopt MaxBytes test
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
  • Loading branch information
Jarema committed Jan 26, 2023
1 parent f7dff23 commit 3416be6
Showing 1 changed file with 16 additions and 2 deletions.
18 changes: 16 additions & 2 deletions server/jetstream_test.go
Expand Up @@ -16808,25 +16808,39 @@ func TestJetStreamPullMaxBytes(t *testing.T) {
req = &JSApiConsumerGetNextRequest{Batch: 1, MaxBytes: 10_000_000, NoWait: true}
jreq, _ = json.Marshal(req)
nc.PublishRequest(subj, reply, jreq)
checkSubsPending(t, sub, 1)
// we expect two messages, as the second one should be `Batch Completed` status.
checkSubsPending(t, sub, 2)

// first one is message from the stream.
m, err = sub.NextMsg(time.Second)
require_NoError(t, err)
require_True(t, len(m.Data) == dsz)
require_True(t, len(m.Header) == 0)
// second one is the status.
m, err = sub.NextMsg(time.Second)
require_NoError(t, err)
if v := m.Header.Get("Description"); v != "Batch Completed" {
t.Fatalf("Expected Batch Completed, got: %s", v)
}
checkSubsPending(t, sub, 0)

// Same but with batch > 1
req = &JSApiConsumerGetNextRequest{Batch: 5, MaxBytes: 10_000_000, NoWait: true}
jreq, _ = json.Marshal(req)
nc.PublishRequest(subj, reply, jreq)
checkSubsPending(t, sub, 5)
// 6, not 5, as 6th is the status.
checkSubsPending(t, sub, 6)
for i := 0; i < 5; i++ {
m, err = sub.NextMsg(time.Second)
require_NoError(t, err)
require_True(t, len(m.Data) == dsz)
require_True(t, len(m.Header) == 0)
}
m, err = sub.NextMsg(time.Second)
require_NoError(t, err)
if v := m.Header.Get("Description"); v != "Batch Completed" {
t.Fatalf("Expected Batch Completed, got: %s", v)
}
checkSubsPending(t, sub, 0)

// Now ask for large batch but make sure we are limited by batch size.
Expand Down

0 comments on commit 3416be6

Please sign in to comment.