From aedc479e75268ba73a45377e84ebf0a620f3228b Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Tue, 5 Oct 2021 09:21:29 -0600 Subject: [PATCH] Fixed ordered consumer test Several issues: - We send a "EOF" empty message which should not be counted as the number of chunks used to reconstitute the asset - Some "message filters" that are removed as part of the execution of the filter's callback would not be put back for the following "sync" test (we test async then sync). Signed-off-by: Ivan Kozlovic --- js_test.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/js_test.go b/js_test.go index 90c03fe64..9685ce154 100644 --- a/js_test.go +++ b/js_test.go @@ -105,7 +105,10 @@ func TestJetStreamOrderedConsumer(t *testing.T) { } else { chunk = msg[i : i+chunkSize] } - js.PublishAsync("a", chunk) + msg := NewMsg("a") + msg.Data = chunk + msg.Header.Set("data", "true") + js.PublishMsgAsync(msg) } js.PublishAsync("a", nil) // eof @@ -171,7 +174,7 @@ func TestJetStreamOrderedConsumer(t *testing.T) { t.Fatalf("Objects do not match") } case <-time.After(5 * time.Second): - t.Fatalf("Did not receive all chunks, only %d of %d total", atomic.LoadUint32(&received), si.State.Msgs) + t.Fatalf("Did not receive all chunks, only %d of %d total", atomic.LoadUint32(&received), si.State.Msgs-1) } } @@ -202,7 +205,7 @@ func TestJetStreamOrderedConsumer(t *testing.T) { rmsg = append(rmsg, m.Data...) } if !done { - t.Fatalf("Did not receive all chunks, only %d of %d total", received, si.State.Msgs) + t.Fatalf("Did not receive all chunks, only %d of %d total", received, si.State.Msgs-1) } if rsum := sha256.Sum256(rmsg); rsum != sum { t.Fatalf("Objects do not match") @@ -215,7 +218,7 @@ func TestJetStreamOrderedConsumer(t *testing.T) { // Now introduce some loss. singleLoss := func(m *Msg) *Msg { - if rand.Intn(100) <= 10 { + if rand.Intn(100) <= 10 && m.Header.Get("data") != _EMPTY_ { nc.removeMsgFilter("a") return nil } @@ -223,10 +226,11 @@ func TestJetStreamOrderedConsumer(t *testing.T) { } nc.addMsgFilter("a", singleLoss) testConsumer() + nc.addMsgFilter("a", singleLoss) testSyncConsumer() multiLoss := func(m *Msg) *Msg { - if rand.Intn(100) <= 10 { + if rand.Intn(100) <= 10 && m.Header.Get("data") != _EMPTY_ { return nil } return m @@ -246,11 +250,12 @@ func TestJetStreamOrderedConsumer(t *testing.T) { } nc.addMsgFilter("a", firstOnly) testConsumer() + nc.addMsgFilter("a", firstOnly) testSyncConsumer() lastOnly := func(m *Msg) *Msg { if meta, err := m.Metadata(); err == nil { - if meta.Sequence.Stream >= si.State.LastSeq { + if meta.Sequence.Stream >= si.State.LastSeq-1 { nc.removeMsgFilter("a") return nil } @@ -259,6 +264,7 @@ func TestJetStreamOrderedConsumer(t *testing.T) { } nc.addMsgFilter("a", lastOnly) testConsumer() + nc.addMsgFilter("a", lastOnly) testSyncConsumer() }