From 43b2641591d4726a1d6b776f7bc3b83864132b58 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 14 Apr 2022 12:53:25 -0600 Subject: [PATCH] [CHANGED] JetStream: accept "Nats-Expected-Last-Sequence" with "0" We use to ignore if the seq was 0, but now would treat it as a requirement that the stream be empty if header is present but set to 0. This relates to client PR: https://github.com/nats-io/nats.go/pull/958 Signed-off-by: Ivan Kozlovic --- server/jetstream_test.go | 10 ++++++++++ server/stream.go | 8 ++++---- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 1b84974609..12d79c737f 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -3471,6 +3471,16 @@ func TestJetStreamPublishExpect(t *testing.T) { t.Fatalf("Expected an error, got %q", resp.Data) } + // Or if we expect that there are no messages by setting "0" as the expected last seq + m.Header.Set(JSExpectedLastSeq, "0") + resp, err = nc.RequestMsg(m, 100*time.Millisecond) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if pa := getPubAckResponse(resp.Data); pa == nil || pa.Error == nil { + t.Fatalf("Expected an error, got %q", resp.Data) + } + // Now send a message with a message ID and make sure we can match that. m = nats.NewMsg("foo.bar") m.Data = []byte("HELLO") diff --git a/server/stream.go b/server/stream.go index 51dab7c6f8..75828de2a0 100644 --- a/server/stream.go +++ b/server/stream.go @@ -2761,12 +2761,12 @@ func getExpectedStream(hdr []byte) string { } // Fast lookup of expected stream. -func getExpectedLastSeq(hdr []byte) uint64 { +func getExpectedLastSeq(hdr []byte) (uint64, bool) { bseq := getHeader(JSExpectedLastSeq, hdr) if len(bseq) == 0 { - return 0 + return 0, false } - return uint64(parseInt64(bseq)) + return uint64(parseInt64(bseq)), true } // Fast lookup of rollups. @@ -2954,7 +2954,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, return errors.New("expected stream does not match") } // Expected last sequence. - if seq := getExpectedLastSeq(hdr); seq > 0 && seq != mset.lseq { + if seq, exists := getExpectedLastSeq(hdr); exists && seq != mset.lseq { mlseq := mset.lseq mset.clfs++ mset.mu.Unlock()