diff --git a/js.go b/js.go index 23480abc7..7001220e1 100644 --- a/js.go +++ b/js.go @@ -336,10 +336,10 @@ type pubOpts struct { ctx context.Context ttl time.Duration id string - lid string // Expected last msgId - str string // Expected stream name - seq uint64 // Expected last sequence - lss uint64 // Expected last sequence per subject + lid string // Expected last msgId + str string // Expected stream name + seq *uint64 // Expected last sequence + lss *uint64 // Expected last sequence per subject // Publish retries for NoResponders err. rwait time.Duration // Retry wait between attempts @@ -415,11 +415,11 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) { if o.str != _EMPTY_ { m.Header.Set(ExpectedStreamHdr, o.str) } - if o.seq > 0 { - m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(o.seq, 10)) + if o.seq != nil { + m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(*o.seq, 10)) } - if o.lss > 0 { - m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(o.lss, 10)) + if o.lss != nil { + m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(*o.lss, 10)) } var resp *Msg @@ -749,11 +749,11 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) { if o.str != _EMPTY_ { m.Header.Set(ExpectedStreamHdr, o.str) } - if o.seq > 0 { - m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(o.seq, 10)) + if o.seq != nil { + m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(*o.seq, 10)) } - if o.lss > 0 { - m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(o.lss, 10)) + if o.lss != nil { + m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(*o.lss, 10)) } // Reply @@ -822,7 +822,7 @@ func ExpectStream(stream string) PubOpt { // ExpectLastSequence sets the expected sequence in the response from the publish. func ExpectLastSequence(seq uint64) PubOpt { return pubOptFn(func(opts *pubOpts) error { - opts.seq = seq + opts.seq = &seq return nil }) } @@ -830,7 +830,7 @@ func ExpectLastSequence(seq uint64) PubOpt { // ExpectLastSequencePerSubject sets the expected sequence per subject in the response from the publish. func ExpectLastSequencePerSubject(seq uint64) PubOpt { return pubOptFn(func(opts *pubOpts) error { - opts.lss = seq + opts.lss = &seq return nil }) } diff --git a/test/js_test.go b/test/js_test.go index 43554efbb..b56a2529b 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -5481,6 +5481,85 @@ func TestJetStreamPublishAsyncPerf(t *testing.T) { fmt.Printf("%.0f msgs/sec\n\n", float64(toSend)/tt.Seconds()) } +func TestJetStreamPublishExpectZero(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + var err error + + // Create the stream using our client API. + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"test", "foo", "bar"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + sub, err := nc.SubscribeSync("foo") + if err != nil { + t.Errorf("Error: %s", err) + } + + // Explicitly set the header to zero. + _, err = js.Publish("foo", []byte("bar"), + nats.ExpectLastSequence(0), + nats.ExpectLastSequencePerSubject(0), + ) + if err != nil { + t.Errorf("Error: %v", err) + } + + rawMsg, err := js.GetMsg("TEST", 1) + if err != nil { + t.Fatalf("Error: %s", err) + } + hdr, ok := rawMsg.Header["Nats-Expected-Last-Sequence"] + if !ok { + t.Fatal("Missing header") + } + got := hdr[0] + expected := "0" + if got != expected { + t.Fatalf("Expected %v, got: %v", expected, got) + } + hdr, ok = rawMsg.Header["Nats-Expected-Last-Subject-Sequence"] + if !ok { + t.Fatal("Missing header") + } + got = hdr[0] + expected = "0" + if got != expected { + t.Fatalf("Expected %v, got: %v", expected, got) + } + + msg, err := sub.NextMsg(1 * time.Second) + if err != nil { + t.Fatalf("Error: %s", err) + } + hdr, ok = msg.Header["Nats-Expected-Last-Sequence"] + if !ok { + t.Fatal("Missing header") + } + got = hdr[0] + expected = "0" + if got != expected { + t.Fatalf("Expected %v, got: %v", expected, got) + } + hdr, ok = msg.Header["Nats-Expected-Last-Subject-Sequence"] + if !ok { + t.Fatal("Missing header") + } + got = hdr[0] + expected = "0" + if got != expected { + t.Fatalf("Expected %v, got: %v", expected, got) + } +} + func TestJetStreamBindConsumer(t *testing.T) { s := RunBasicJetStreamServer() defer shutdownJSServerAndRemoveStorage(t, s)