diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 1b84974609..12d79c737f 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -3471,6 +3471,16 @@ func TestJetStreamPublishExpect(t *testing.T) { t.Fatalf("Expected an error, got %q", resp.Data) } + // Or if we expect that there are no messages by setting "0" as the expected last seq + m.Header.Set(JSExpectedLastSeq, "0") + resp, err = nc.RequestMsg(m, 100*time.Millisecond) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if pa := getPubAckResponse(resp.Data); pa == nil || pa.Error == nil { + t.Fatalf("Expected an error, got %q", resp.Data) + } + // Now send a message with a message ID and make sure we can match that. m = nats.NewMsg("foo.bar") m.Data = []byte("HELLO") diff --git a/server/stream.go b/server/stream.go index 51dab7c6f8..75828de2a0 100644 --- a/server/stream.go +++ b/server/stream.go @@ -2761,12 +2761,12 @@ func getExpectedStream(hdr []byte) string { } // Fast lookup of expected stream. -func getExpectedLastSeq(hdr []byte) uint64 { +func getExpectedLastSeq(hdr []byte) (uint64, bool) { bseq := getHeader(JSExpectedLastSeq, hdr) if len(bseq) == 0 { - return 0 + return 0, false } - return uint64(parseInt64(bseq)) + return uint64(parseInt64(bseq)), true } // Fast lookup of rollups. @@ -2954,7 +2954,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, return errors.New("expected stream does not match") } // Expected last sequence. - if seq := getExpectedLastSeq(hdr); seq > 0 && seq != mset.lseq { + if seq, exists := getExpectedLastSeq(hdr); exists && seq != mset.lseq { mlseq := mset.lseq mset.clfs++ mset.mu.Unlock()