Skip to content

Commit

Permalink
Merge pull request #797 from nats-io/add_expected_last_subject_sequence
Browse files Browse the repository at this point in the history
[ADDED] ExpectLastSequencePerSubject() publish option
  • Loading branch information
kozlovic committed Aug 17, 2021
2 parents 12b9459 + ea700ff commit 61828e8
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 4 deletions.
24 changes: 20 additions & 4 deletions js.go
Expand Up @@ -260,6 +260,7 @@ type pubOpts struct {
lid string // Expected last msgId
str string // Expected stream name
seq uint64 // Expected last sequence
lss uint64 // Expected last sequence per subject
}

// pubAckResponse is the ack response from the JetStream API when publishing a message.
Expand All @@ -278,10 +279,11 @@ type PubAck struct {

// Headers for published messages.
const (
MsgIdHdr = "Nats-Msg-Id"
ExpectedStreamHdr = "Nats-Expected-Stream"
ExpectedLastSeqHdr = "Nats-Expected-Last-Sequence"
ExpectedLastMsgIdHdr = "Nats-Expected-Last-Msg-Id"
MsgIdHdr = "Nats-Msg-Id"
ExpectedStreamHdr = "Nats-Expected-Stream"
ExpectedLastSeqHdr = "Nats-Expected-Last-Sequence"
ExpectedLastSubjSeqHdr = "Nats-Expected-Last-Subject-Sequence"
ExpectedLastMsgIdHdr = "Nats-Expected-Last-Msg-Id"
)

// PublishMsg publishes a Msg to a stream from JetStream.
Expand Down Expand Up @@ -317,6 +319,9 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
if o.seq > 0 {
m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(o.seq, 10))
}
if o.lss > 0 {
m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(o.lss, 10))
}

var resp *Msg
var err error
Expand Down Expand Up @@ -618,6 +623,9 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
if o.seq > 0 {
m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(o.seq, 10))
}
if o.lss > 0 {
m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(o.lss, 10))
}

// Reply
if m.Reply != _EMPTY_ {
Expand Down Expand Up @@ -687,6 +695,14 @@ func ExpectLastSequence(seq uint64) PubOpt {
})
}

// ExpectLastSequencePerSubject sets the expected sequence per subject in the response from the publish.
func ExpectLastSequencePerSubject(seq uint64) PubOpt {
return pubOptFn(func(opts *pubOpts) error {
opts.lss = seq
return nil
})
}

// ExpectLastMsgId sets the expected last msgId in the response from the publish.
func ExpectLastMsgId(id string) PubOpt {
return pubOptFn(func(opts *pubOpts) error {
Expand Down
14 changes: 14 additions & 0 deletions test/js_test.go
Expand Up @@ -264,6 +264,20 @@ func TestJetStreamPublish(t *testing.T) {
if err != context.Canceled {
t.Fatalf("Expected %q, got %q", context.Canceled, err)
}

// Test ExpectLastSequencePerSubject. Just make sure that we set the header.
sub, err = nc.SubscribeSync("test")
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
js.Publish("test", []byte("msg"), nats.ExpectLastSequencePerSubject(1))
m, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error on next msg: %v", err)
}
if m.Header.Get(nats.ExpectedLastSubjSeqHdr) != "1" {
t.Fatalf("Header ExpectLastSequencePerSubject not set: %+v", m.Header)
}
}

func TestJetStreamSubscribe(t *testing.T) {
Expand Down

0 comments on commit 61828e8

Please sign in to comment.