Skip to content

Commit

Permalink
Merge pull request #958 from bruth/fix-exp-seq-headers
Browse files Browse the repository at this point in the history
Change expected last sequence pub options to pointers
  • Loading branch information
kozlovic committed Apr 16, 2022
2 parents c75dfd5 + dc7e667 commit 68c2aa3
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 14 deletions.
28 changes: 14 additions & 14 deletions js.go
Expand Up @@ -336,10 +336,10 @@ type pubOpts struct {
ctx context.Context
ttl time.Duration
id string
lid string // Expected last msgId
str string // Expected stream name
seq uint64 // Expected last sequence
lss uint64 // Expected last sequence per subject
lid string // Expected last msgId
str string // Expected stream name
seq *uint64 // Expected last sequence
lss *uint64 // Expected last sequence per subject

// Publish retries for NoResponders err.
rwait time.Duration // Retry wait between attempts
Expand Down Expand Up @@ -415,11 +415,11 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
if o.str != _EMPTY_ {
m.Header.Set(ExpectedStreamHdr, o.str)
}
if o.seq > 0 {
m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(o.seq, 10))
if o.seq != nil {
m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(*o.seq, 10))
}
if o.lss > 0 {
m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(o.lss, 10))
if o.lss != nil {
m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(*o.lss, 10))
}

var resp *Msg
Expand Down Expand Up @@ -749,11 +749,11 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
if o.str != _EMPTY_ {
m.Header.Set(ExpectedStreamHdr, o.str)
}
if o.seq > 0 {
m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(o.seq, 10))
if o.seq != nil {
m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(*o.seq, 10))
}
if o.lss > 0 {
m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(o.lss, 10))
if o.lss != nil {
m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(*o.lss, 10))
}

// Reply
Expand Down Expand Up @@ -822,15 +822,15 @@ func ExpectStream(stream string) PubOpt {
// ExpectLastSequence sets the expected sequence in the response from the publish.
func ExpectLastSequence(seq uint64) PubOpt {
return pubOptFn(func(opts *pubOpts) error {
opts.seq = seq
opts.seq = &seq
return nil
})
}

// ExpectLastSequencePerSubject sets the expected sequence per subject in the response from the publish.
func ExpectLastSequencePerSubject(seq uint64) PubOpt {
return pubOptFn(func(opts *pubOpts) error {
opts.lss = seq
opts.lss = &seq
return nil
})
}
Expand Down
79 changes: 79 additions & 0 deletions test/js_test.go
Expand Up @@ -5481,6 +5481,85 @@ func TestJetStreamPublishAsyncPerf(t *testing.T) {
fmt.Printf("%.0f msgs/sec\n\n", float64(toSend)/tt.Seconds())
}

func TestJetStreamPublishExpectZero(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

var err error

// Create the stream using our client API.
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"test", "foo", "bar"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

sub, err := nc.SubscribeSync("foo")
if err != nil {
t.Errorf("Error: %s", err)
}

// Explicitly set the header to zero.
_, err = js.Publish("foo", []byte("bar"),
nats.ExpectLastSequence(0),
nats.ExpectLastSequencePerSubject(0),
)
if err != nil {
t.Errorf("Error: %v", err)
}

rawMsg, err := js.GetMsg("TEST", 1)
if err != nil {
t.Fatalf("Error: %s", err)
}
hdr, ok := rawMsg.Header["Nats-Expected-Last-Sequence"]
if !ok {
t.Fatal("Missing header")
}
got := hdr[0]
expected := "0"
if got != expected {
t.Fatalf("Expected %v, got: %v", expected, got)
}
hdr, ok = rawMsg.Header["Nats-Expected-Last-Subject-Sequence"]
if !ok {
t.Fatal("Missing header")
}
got = hdr[0]
expected = "0"
if got != expected {
t.Fatalf("Expected %v, got: %v", expected, got)
}

msg, err := sub.NextMsg(1 * time.Second)
if err != nil {
t.Fatalf("Error: %s", err)
}
hdr, ok = msg.Header["Nats-Expected-Last-Sequence"]
if !ok {
t.Fatal("Missing header")
}
got = hdr[0]
expected = "0"
if got != expected {
t.Fatalf("Expected %v, got: %v", expected, got)
}
hdr, ok = msg.Header["Nats-Expected-Last-Subject-Sequence"]
if !ok {
t.Fatal("Missing header")
}
got = hdr[0]
expected = "0"
if got != expected {
t.Fatalf("Expected %v, got: %v", expected, got)
}
}

func TestJetStreamBindConsumer(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)
Expand Down

0 comments on commit 68c2aa3

Please sign in to comment.