Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change expected last sequence pub options to pointers #958

Merged
merged 2 commits into from Apr 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 14 additions & 14 deletions js.go
Expand Up @@ -336,10 +336,10 @@ type pubOpts struct {
ctx context.Context
ttl time.Duration
id string
lid string // Expected last msgId
str string // Expected stream name
seq uint64 // Expected last sequence
lss uint64 // Expected last sequence per subject
lid string // Expected last msgId
str string // Expected stream name
seq *uint64 // Expected last sequence
lss *uint64 // Expected last sequence per subject

// Publish retries for NoResponders err.
rwait time.Duration // Retry wait between attempts
Expand Down Expand Up @@ -415,11 +415,11 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
if o.str != _EMPTY_ {
m.Header.Set(ExpectedStreamHdr, o.str)
}
if o.seq > 0 {
m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(o.seq, 10))
if o.seq != nil {
m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(*o.seq, 10))
}
if o.lss > 0 {
m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(o.lss, 10))
if o.lss != nil {
m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(*o.lss, 10))
}

var resp *Msg
Expand Down Expand Up @@ -749,11 +749,11 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
if o.str != _EMPTY_ {
m.Header.Set(ExpectedStreamHdr, o.str)
}
if o.seq > 0 {
m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(o.seq, 10))
if o.seq != nil {
m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(*o.seq, 10))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to say that it does not matter since '0' is not used by the server, which is true for that header (https://github.com/nats-io/nats-server/blob/0df5da3924799cfab51151c1143421fca3bcb4c8/server/stream.go#L2957) but not for the other indeed (https://github.com/nats-io/nats-server/blob/0df5da3924799cfab51151c1143421fca3bcb4c8/server/stream.go#L2996).

So I think that we could accept the change, and this should not break existing apps. Let's see if @wallyqs has a comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would also be nice if the server had it on both for consistency?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kozlovic Thanks for pointing that out. FWIW, I agree with @ripienaar that it should be the same behavior for both subject and stream-level.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I created a PR for the server but changed it to Draft. As Derek mentioned, this would work only for the very first message since after that, the stream first sequence should never go back to 0. There may still be value, but being close to 2.8.0 release, although we could add it as a "CHANGED" entry in the release notes, maybe we will wait for the 2.9.0 release as RI suggested.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kozlovic Thanks and understood. I presume we can still move forward with this PR so the subject-level header will have its behavior corrected?

}
if o.lss > 0 {
m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(o.lss, 10))
if o.lss != nil {
m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(*o.lss, 10))
}

// Reply
Expand Down Expand Up @@ -822,15 +822,15 @@ func ExpectStream(stream string) PubOpt {
// ExpectLastSequence sets the expected sequence in the response from the publish.
func ExpectLastSequence(seq uint64) PubOpt {
return pubOptFn(func(opts *pubOpts) error {
opts.seq = seq
opts.seq = &seq
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a test like this to cover the behavior change, header is persisted now along with the message but should be fine.

func TestJetStreamPublishExpectZero(t *testing.T) {
	s := RunBasicJetStreamServer()
	defer shutdownJSServerAndRemoveStorage(t, s)

	nc, js := jsClient(t, s)
	defer nc.Close()

	var err error

	// Create the stream using our client API.
	_, err = js.AddStream(&nats.StreamConfig{
		Name:     "TEST",
		Subjects: []string{"test", "foo", "bar"},
	})
	if err != nil {
		t.Fatalf("Unexpected error: %v", err)
	}

	sub, err := nc.SubscribeSync("foo")
	if err != nil {
		t.Errorf("Error: %s", err)
	}

	// Explicitly set the header to zero.
	_, err = js.Publish("foo", []byte("bar"), nats.ExpectLastSequence(0))
	if err != nil {
		t.Errorf("Error: %v", err)
	}

	rawMsg, err := js.GetMsg("TEST", 1)
	if err != nil {
		t.Fatalf("Error: %s", err)
	}
	hdr, ok := rawMsg.Header["Nats-Expected-Last-Sequence"]
	if !ok {
		t.Fatal("Missing header")
	}
	got := hdr[0]
	expected := "0"
	if got != expected {
		t.Fatalf("Expected %v, got: %v", expected, got)
	}

	msg, err := sub.NextMsg(1*time.Second)
	if err != nil {
		t.Fatalf("Error: %s", err)
	}
	hdr, ok = msg.Header["Nats-Expected-Last-Sequence"]
	if !ok {
		t.Fatal("Missing header")
	}
	got = hdr[0]
	expected = "0"
	if got != expected {
		t.Fatalf("Expected %v, got: %v", expected, got)
	}
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wallyqs Added a test to check both headers, thanks for stubbing this out.

return nil
})
}

// ExpectLastSequencePerSubject sets the expected sequence per subject in the response from the publish.
func ExpectLastSequencePerSubject(seq uint64) PubOpt {
return pubOptFn(func(opts *pubOpts) error {
opts.lss = seq
opts.lss = &seq
return nil
})
}
Expand Down
79 changes: 79 additions & 0 deletions test/js_test.go
Expand Up @@ -5481,6 +5481,85 @@ func TestJetStreamPublishAsyncPerf(t *testing.T) {
fmt.Printf("%.0f msgs/sec\n\n", float64(toSend)/tt.Seconds())
}

func TestJetStreamPublishExpectZero(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

var err error

// Create the stream using our client API.
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"test", "foo", "bar"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

sub, err := nc.SubscribeSync("foo")
if err != nil {
t.Errorf("Error: %s", err)
}

// Explicitly set the header to zero.
_, err = js.Publish("foo", []byte("bar"),
nats.ExpectLastSequence(0),
nats.ExpectLastSequencePerSubject(0),
)
if err != nil {
t.Errorf("Error: %v", err)
}

rawMsg, err := js.GetMsg("TEST", 1)
if err != nil {
t.Fatalf("Error: %s", err)
}
hdr, ok := rawMsg.Header["Nats-Expected-Last-Sequence"]
if !ok {
t.Fatal("Missing header")
}
got := hdr[0]
expected := "0"
if got != expected {
t.Fatalf("Expected %v, got: %v", expected, got)
}
hdr, ok = rawMsg.Header["Nats-Expected-Last-Subject-Sequence"]
if !ok {
t.Fatal("Missing header")
}
got = hdr[0]
expected = "0"
if got != expected {
t.Fatalf("Expected %v, got: %v", expected, got)
}

msg, err := sub.NextMsg(1 * time.Second)
if err != nil {
t.Fatalf("Error: %s", err)
}
hdr, ok = msg.Header["Nats-Expected-Last-Sequence"]
if !ok {
t.Fatal("Missing header")
}
got = hdr[0]
expected = "0"
if got != expected {
t.Fatalf("Expected %v, got: %v", expected, got)
}
hdr, ok = msg.Header["Nats-Expected-Last-Subject-Sequence"]
if !ok {
t.Fatal("Missing header")
}
got = hdr[0]
expected = "0"
if got != expected {
t.Fatalf("Expected %v, got: %v", expected, got)
}
}

func TestJetStreamBindConsumer(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)
Expand Down