From 46fc269f6c73a8d6cddd63b3705232502b59f281 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 14 Apr 2022 12:53:25 -0600 Subject: [PATCH] [CHANGED] JetStream: accept "Nats-Expected-Last-Sequence" with "0" We use to ignore if the seq was 0, but now would treat it as a requirement that the stream be empty if header is present but set to 0. This relates to client PR: https://github.com/nats-io/nats.go/pull/958 Signed-off-by: Ivan Kozlovic --- server/jetstream_test.go | 10 ++++++++++ server/stream.go | 8 ++++---- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index cc79d65934..28687c261f 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -3478,6 +3478,16 @@ func TestJetStreamPublishExpect(t *testing.T) { t.Fatalf("Expected an error, got %q", resp.Data) } + // Or if we expect that there are no messages by setting "0" as the expected last seq + m.Header.Set(JSExpectedLastSeq, "0") + resp, err = nc.RequestMsg(m, 100*time.Millisecond) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if pa := getPubAckResponse(resp.Data); pa == nil || pa.Error == nil { + t.Fatalf("Expected an error, got %q", resp.Data) + } + // Now send a message with a message ID and make sure we can match that. m = nats.NewMsg("foo.bar") m.Data = []byte("HELLO") diff --git a/server/stream.go b/server/stream.go index 6b62741bcc..cb3ec3ebff 100644 --- a/server/stream.go +++ b/server/stream.go @@ -3187,12 +3187,12 @@ func getExpectedStream(hdr []byte) string { } // Fast lookup of expected stream. -func getExpectedLastSeq(hdr []byte) uint64 { +func getExpectedLastSeq(hdr []byte) (uint64, bool) { bseq := getHeader(JSExpectedLastSeq, hdr) if len(bseq) == 0 { - return 0 + return 0, false } - return uint64(parseInt64(bseq)) + return uint64(parseInt64(bseq)), true } // Fast lookup of rollups. @@ -3472,7 +3472,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, return errors.New("expected stream does not match") } // Expected last sequence. - if seq := getExpectedLastSeq(hdr); seq > 0 && seq != mset.lseq { + if seq, exists := getExpectedLastSeq(hdr); exists && seq != mset.lseq { mlseq := mset.lseq mset.clfs++ mset.mu.Unlock()