Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] ExpectLastSequencePerSubject() publish option #797

Merged
merged 1 commit into from Aug 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 20 additions & 4 deletions js.go
Expand Up @@ -260,6 +260,7 @@ type pubOpts struct {
lid string // Expected last msgId
str string // Expected stream name
seq uint64 // Expected last sequence
lss uint64 // Expected last sequence per subject
}

// pubAckResponse is the ack response from the JetStream API when publishing a message.
Expand All @@ -278,10 +279,11 @@ type PubAck struct {

// Headers for published messages.
const (
MsgIdHdr = "Nats-Msg-Id"
ExpectedStreamHdr = "Nats-Expected-Stream"
ExpectedLastSeqHdr = "Nats-Expected-Last-Sequence"
ExpectedLastMsgIdHdr = "Nats-Expected-Last-Msg-Id"
MsgIdHdr = "Nats-Msg-Id"
ExpectedStreamHdr = "Nats-Expected-Stream"
ExpectedLastSeqHdr = "Nats-Expected-Last-Sequence"
ExpectedLastSubjSeqHdr = "Nats-Expected-Last-Subject-Sequence"
ExpectedLastMsgIdHdr = "Nats-Expected-Last-Msg-Id"
)

// PublishMsg publishes a Msg to a stream from JetStream.
Expand Down Expand Up @@ -317,6 +319,9 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
if o.seq > 0 {
m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(o.seq, 10))
}
if o.lss > 0 {
m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(o.lss, 10))
}

var resp *Msg
var err error
Expand Down Expand Up @@ -618,6 +623,9 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
if o.seq > 0 {
m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(o.seq, 10))
}
if o.lss > 0 {
m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(o.lss, 10))
}

// Reply
if m.Reply != _EMPTY_ {
Expand Down Expand Up @@ -687,6 +695,14 @@ func ExpectLastSequence(seq uint64) PubOpt {
})
}

// ExpectLastSequencePerSubject sets the expected sequence per subject in the response from the publish.
func ExpectLastSequencePerSubject(seq uint64) PubOpt {
return pubOptFn(func(opts *pubOpts) error {
opts.lss = seq
return nil
})
}

// ExpectLastMsgId sets the expected last msgId in the response from the publish.
func ExpectLastMsgId(id string) PubOpt {
return pubOptFn(func(opts *pubOpts) error {
Expand Down
14 changes: 14 additions & 0 deletions test/js_test.go
Expand Up @@ -264,6 +264,20 @@ func TestJetStreamPublish(t *testing.T) {
if err != context.Canceled {
t.Fatalf("Expected %q, got %q", context.Canceled, err)
}

// Test ExpectLastSequencePerSubject. Just make sure that we set the header.
sub, err = nc.SubscribeSync("test")
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
js.Publish("test", []byte("msg"), nats.ExpectLastSequencePerSubject(1))
m, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error on next msg: %v", err)
}
if m.Header.Get(nats.ExpectedLastSubjSeqHdr) != "1" {
t.Fatalf("Header ExpectLastSequencePerSubject not set: %+v", m.Header)
}
}

func TestJetStreamSubscribe(t *testing.T) {
Expand Down