diff --git a/js.go b/js.go index 34389d098..d14bfe278 100644 --- a/js.go +++ b/js.go @@ -260,6 +260,7 @@ type pubOpts struct { lid string // Expected last msgId str string // Expected stream name seq uint64 // Expected last sequence + lss uint64 // Expected last sequence per subject } // pubAckResponse is the ack response from the JetStream API when publishing a message. @@ -278,10 +279,11 @@ type PubAck struct { // Headers for published messages. const ( - MsgIdHdr = "Nats-Msg-Id" - ExpectedStreamHdr = "Nats-Expected-Stream" - ExpectedLastSeqHdr = "Nats-Expected-Last-Sequence" - ExpectedLastMsgIdHdr = "Nats-Expected-Last-Msg-Id" + MsgIdHdr = "Nats-Msg-Id" + ExpectedStreamHdr = "Nats-Expected-Stream" + ExpectedLastSeqHdr = "Nats-Expected-Last-Sequence" + ExpectedLastSubjSeqHdr = "Nats-Expected-Last-Subject-Sequence" + ExpectedLastMsgIdHdr = "Nats-Expected-Last-Msg-Id" ) // PublishMsg publishes a Msg to a stream from JetStream. @@ -317,6 +319,9 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) { if o.seq > 0 { m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(o.seq, 10)) } + if o.lss > 0 { + m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(o.lss, 10)) + } var resp *Msg var err error @@ -618,6 +623,9 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) { if o.seq > 0 { m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(o.seq, 10)) } + if o.lss > 0 { + m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(o.lss, 10)) + } // Reply if m.Reply != _EMPTY_ { @@ -687,6 +695,14 @@ func ExpectLastSequence(seq uint64) PubOpt { }) } +// ExpectLastSequencePerSubject sets the expected sequence per subject in the response from the publish. +func ExpectLastSequencePerSubject(seq uint64) PubOpt { + return pubOptFn(func(opts *pubOpts) error { + opts.lss = seq + return nil + }) +} + // ExpectLastMsgId sets the expected last msgId in the response from the publish. func ExpectLastMsgId(id string) PubOpt { return pubOptFn(func(opts *pubOpts) error { diff --git a/test/js_test.go b/test/js_test.go index 660029d04..a54443042 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -264,6 +264,20 @@ func TestJetStreamPublish(t *testing.T) { if err != context.Canceled { t.Fatalf("Expected %q, got %q", context.Canceled, err) } + + // Test ExpectLastSequencePerSubject. Just make sure that we set the header. + sub, err = nc.SubscribeSync("test") + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + js.Publish("test", []byte("msg"), nats.ExpectLastSequencePerSubject(1)) + m, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error on next msg: %v", err) + } + if m.Header.Get(nats.ExpectedLastSubjSeqHdr) != "1" { + t.Fatalf("Header ExpectLastSequencePerSubject not set: %+v", m.Header) + } } func TestJetStreamSubscribe(t *testing.T) {