New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Change expected last sequence pub options to pointers #958
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -336,10 +336,10 @@ type pubOpts struct { | |
ctx context.Context | ||
ttl time.Duration | ||
id string | ||
lid string // Expected last msgId | ||
str string // Expected stream name | ||
seq uint64 // Expected last sequence | ||
lss uint64 // Expected last sequence per subject | ||
lid string // Expected last msgId | ||
str string // Expected stream name | ||
seq *uint64 // Expected last sequence | ||
lss *uint64 // Expected last sequence per subject | ||
|
||
// Publish retries for NoResponders err. | ||
rwait time.Duration // Retry wait between attempts | ||
|
@@ -415,11 +415,11 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) { | |
if o.str != _EMPTY_ { | ||
m.Header.Set(ExpectedStreamHdr, o.str) | ||
} | ||
if o.seq > 0 { | ||
m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(o.seq, 10)) | ||
if o.seq != nil { | ||
m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(*o.seq, 10)) | ||
} | ||
if o.lss > 0 { | ||
m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(o.lss, 10)) | ||
if o.lss != nil { | ||
m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(*o.lss, 10)) | ||
} | ||
|
||
var resp *Msg | ||
|
@@ -749,11 +749,11 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) { | |
if o.str != _EMPTY_ { | ||
m.Header.Set(ExpectedStreamHdr, o.str) | ||
} | ||
if o.seq > 0 { | ||
m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(o.seq, 10)) | ||
if o.seq != nil { | ||
m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(*o.seq, 10)) | ||
} | ||
if o.lss > 0 { | ||
m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(o.lss, 10)) | ||
if o.lss != nil { | ||
m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(*o.lss, 10)) | ||
} | ||
|
||
// Reply | ||
|
@@ -822,15 +822,15 @@ func ExpectStream(stream string) PubOpt { | |
// ExpectLastSequence sets the expected sequence in the response from the publish. | ||
func ExpectLastSequence(seq uint64) PubOpt { | ||
return pubOptFn(func(opts *pubOpts) error { | ||
opts.seq = seq | ||
opts.seq = &seq | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe add a test like this to cover the behavior change, header is persisted now along with the message but should be fine. func TestJetStreamPublishExpectZero(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)
nc, js := jsClient(t, s)
defer nc.Close()
var err error
// Create the stream using our client API.
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"test", "foo", "bar"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
sub, err := nc.SubscribeSync("foo")
if err != nil {
t.Errorf("Error: %s", err)
}
// Explicitly set the header to zero.
_, err = js.Publish("foo", []byte("bar"), nats.ExpectLastSequence(0))
if err != nil {
t.Errorf("Error: %v", err)
}
rawMsg, err := js.GetMsg("TEST", 1)
if err != nil {
t.Fatalf("Error: %s", err)
}
hdr, ok := rawMsg.Header["Nats-Expected-Last-Sequence"]
if !ok {
t.Fatal("Missing header")
}
got := hdr[0]
expected := "0"
if got != expected {
t.Fatalf("Expected %v, got: %v", expected, got)
}
msg, err := sub.NextMsg(1*time.Second)
if err != nil {
t.Fatalf("Error: %s", err)
}
hdr, ok = msg.Header["Nats-Expected-Last-Sequence"]
if !ok {
t.Fatal("Missing header")
}
got = hdr[0]
expected = "0"
if got != expected {
t.Fatalf("Expected %v, got: %v", expected, got)
}
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @wallyqs Added a test to check both headers, thanks for stubbing this out. |
||
return nil | ||
}) | ||
} | ||
|
||
// ExpectLastSequencePerSubject sets the expected sequence per subject in the response from the publish. | ||
func ExpectLastSequencePerSubject(seq uint64) PubOpt { | ||
return pubOptFn(func(opts *pubOpts) error { | ||
opts.lss = seq | ||
opts.lss = &seq | ||
return nil | ||
}) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was going to say that it does not matter since '0' is not used by the server, which is true for that header (https://github.com/nats-io/nats-server/blob/0df5da3924799cfab51151c1143421fca3bcb4c8/server/stream.go#L2957) but not for the other indeed (https://github.com/nats-io/nats-server/blob/0df5da3924799cfab51151c1143421fca3bcb4c8/server/stream.go#L2996).
So I think that we could accept the change, and this should not break existing apps. Let's see if @wallyqs has a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would also be nice if the server had it on both for consistency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kozlovic Thanks for pointing that out. FWIW, I agree with @ripienaar that it should be the same behavior for both subject and stream-level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I created a PR for the server but changed it to Draft. As Derek mentioned, this would work only for the very first message since after that, the stream first sequence should never go back to 0. There may still be value, but being close to 2.8.0 release, although we could add it as a "CHANGED" entry in the release notes, maybe we will wait for the 2.9.0 release as RI suggested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kozlovic Thanks and understood. I presume we can still move forward with this PR so the subject-level header will have its behavior corrected?