Skip to content

Commit

Permalink
Fixed ordered consumer test
Browse files Browse the repository at this point in the history
Several issues:
- We send a "EOF" empty message which should not be counted as
the number of chunks used to reconstitute the asset
- Some "message filters" that are removed as part of the execution
of the filter's callback would not be put back for the following
"sync" test (we test async then sync).

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Oct 5, 2021
1 parent b9e5de3 commit fbe2b65
Showing 1 changed file with 12 additions and 6 deletions.
18 changes: 12 additions & 6 deletions js_test.go
Expand Up @@ -105,7 +105,10 @@ func TestJetStreamOrderedConsumer(t *testing.T) {
} else {
chunk = msg[i : i+chunkSize]
}
js.PublishAsync("a", chunk)
msg := NewMsg("a")
msg.Data = chunk
msg.Header.Set("data", "true")
js.PublishMsgAsync(msg)
}
js.PublishAsync("a", nil) // eof

Expand Down Expand Up @@ -171,7 +174,7 @@ func TestJetStreamOrderedConsumer(t *testing.T) {
t.Fatalf("Objects do not match")
}
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive all chunks, only %d of %d total", atomic.LoadUint32(&received), si.State.Msgs)
t.Fatalf("Did not receive all chunks, only %d of %d total", atomic.LoadUint32(&received), si.State.Msgs-1)
}
}

Expand Down Expand Up @@ -202,7 +205,7 @@ func TestJetStreamOrderedConsumer(t *testing.T) {
rmsg = append(rmsg, m.Data...)
}
if !done {
t.Fatalf("Did not receive all chunks, only %d of %d total", received, si.State.Msgs)
t.Fatalf("Did not receive all chunks, only %d of %d total", received, si.State.Msgs-1)
}
if rsum := sha256.Sum256(rmsg); rsum != sum {
t.Fatalf("Objects do not match")
Expand All @@ -215,18 +218,19 @@ func TestJetStreamOrderedConsumer(t *testing.T) {

// Now introduce some loss.
singleLoss := func(m *Msg) *Msg {
if rand.Intn(100) <= 10 {
if rand.Intn(100) <= 10 && m.Header.Get("data") != _EMPTY_ {
nc.removeMsgFilter("a")
return nil
}
return m
}
nc.addMsgFilter("a", singleLoss)
testConsumer()
nc.addMsgFilter("a", singleLoss)
testSyncConsumer()

multiLoss := func(m *Msg) *Msg {
if rand.Intn(100) <= 10 {
if rand.Intn(100) <= 10 && m.Header.Get("data") != _EMPTY_ {
return nil
}
return m
Expand All @@ -246,11 +250,12 @@ func TestJetStreamOrderedConsumer(t *testing.T) {
}
nc.addMsgFilter("a", firstOnly)
testConsumer()
nc.addMsgFilter("a", firstOnly)
testSyncConsumer()

lastOnly := func(m *Msg) *Msg {
if meta, err := m.Metadata(); err == nil {
if meta.Sequence.Stream >= si.State.LastSeq {
if meta.Sequence.Stream >= si.State.LastSeq-1 {
nc.removeMsgFilter("a")
return nil
}
Expand All @@ -259,6 +264,7 @@ func TestJetStreamOrderedConsumer(t *testing.T) {
}
nc.addMsgFilter("a", lastOnly)
testConsumer()
nc.addMsgFilter("a", lastOnly)
testSyncConsumer()
}

Expand Down

0 comments on commit fbe2b65

Please sign in to comment.