Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] Last per subject delivery policy #798

Merged
merged 1 commit into from Aug 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 17 additions & 0 deletions js.go
Expand Up @@ -1732,6 +1732,15 @@ func DeliverLast() SubOpt {
})
}

// DeliverLastPerSubject configures a Consumer to receive messages
// starting with the latest one for each filtered subject.
func DeliverLastPerSubject() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.DeliverPolicy = DeliverLastPerSubjectPolicy
return nil
})
}

// DeliverNew configures a Consumer to receive messages
// published after the subscription.
func DeliverNew() SubOpt {
Expand Down Expand Up @@ -2523,6 +2532,10 @@ const (
// DeliverByStartTimePolicy will deliver messages starting from a given
// time.
DeliverByStartTimePolicy

// DeliverLastPerSubjectPolicy will start the consumer with the last message
// for all subjects received.
DeliverLastPerSubjectPolicy
)

func (p *DeliverPolicy) UnmarshalJSON(data []byte) error {
Expand All @@ -2537,6 +2550,8 @@ func (p *DeliverPolicy) UnmarshalJSON(data []byte) error {
*p = DeliverByStartSequencePolicy
case jsonString("by_start_time"):
*p = DeliverByStartTimePolicy
case jsonString("last_per_subject"):
*p = DeliverLastPerSubjectPolicy
}

return nil
Expand All @@ -2554,6 +2569,8 @@ func (p DeliverPolicy) MarshalJSON() ([]byte, error) {
return json.Marshal("by_start_sequence")
case DeliverByStartTimePolicy:
return json.Marshal("by_start_time")
case DeliverLastPerSubjectPolicy:
return json.Marshal("last_per_subject")
default:
return nil, fmt.Errorf("nats: unknown deliver policy %v", p)
}
Expand Down
17 changes: 16 additions & 1 deletion test/js_test.go
Expand Up @@ -2343,7 +2343,7 @@ func TestJetStreamSubscribe_DeliverPolicy(t *testing.T) {
// Create the stream using our client API.
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Subjects: []string{"foo", "bar"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
Expand Down Expand Up @@ -2405,6 +2405,21 @@ func TestJetStreamSubscribe_DeliverPolicy(t *testing.T) {
}
})
}

js.Publish("bar", []byte("bar msg 1"))
js.Publish("bar", []byte("bar msg 2"))

sub, err := js.SubscribeSync("bar", nats.BindStream("TEST"), nats.DeliverLastPerSubject())
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error on next msg: %v", err)
}
if string(msg.Data) != "bar msg 2" {
t.Fatalf("Unexepcted last message: %q", msg.Data)
}
}

func TestJetStreamSubscribe_AckPolicy(t *testing.T) {
Expand Down