diff --git a/js.go b/js.go index 34389d098..7b547c6c7 100644 --- a/js.go +++ b/js.go @@ -1732,6 +1732,15 @@ func DeliverLast() SubOpt { }) } +// DeliverLastPerSubject configures a Consumer to receive messages +// starting with the latest one for each filtered subject. +func DeliverLastPerSubject() SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.DeliverPolicy = DeliverLastPerSubjectPolicy + return nil + }) +} + // DeliverNew configures a Consumer to receive messages // published after the subscription. func DeliverNew() SubOpt { @@ -2523,6 +2532,10 @@ const ( // DeliverByStartTimePolicy will deliver messages starting from a given // time. DeliverByStartTimePolicy + + // DeliverLastPerSubjectPolicy will start the consumer with the last message + // for all subjects received. + DeliverLastPerSubjectPolicy ) func (p *DeliverPolicy) UnmarshalJSON(data []byte) error { @@ -2537,6 +2550,8 @@ func (p *DeliverPolicy) UnmarshalJSON(data []byte) error { *p = DeliverByStartSequencePolicy case jsonString("by_start_time"): *p = DeliverByStartTimePolicy + case jsonString("last_per_subject"): + *p = DeliverLastPerSubjectPolicy } return nil @@ -2554,6 +2569,8 @@ func (p DeliverPolicy) MarshalJSON() ([]byte, error) { return json.Marshal("by_start_sequence") case DeliverByStartTimePolicy: return json.Marshal("by_start_time") + case DeliverLastPerSubjectPolicy: + return json.Marshal("last_per_subject") default: return nil, fmt.Errorf("nats: unknown deliver policy %v", p) } diff --git a/test/js_test.go b/test/js_test.go index 660029d04..613744c4a 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -2343,7 +2343,7 @@ func TestJetStreamSubscribe_DeliverPolicy(t *testing.T) { // Create the stream using our client API. _, err = js.AddStream(&nats.StreamConfig{ Name: "TEST", - Subjects: []string{"foo"}, + Subjects: []string{"foo", "bar"}, }) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -2405,6 +2405,21 @@ func TestJetStreamSubscribe_DeliverPolicy(t *testing.T) { } }) } + + js.Publish("bar", []byte("bar msg 1")) + js.Publish("bar", []byte("bar msg 2")) + + sub, err := js.SubscribeSync("bar", nats.BindStream("TEST"), nats.DeliverLastPerSubject()) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + msg, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error on next msg: %v", err) + } + if string(msg.Data) != "bar msg 2" { + t.Fatalf("Unexepcted last message: %q", msg.Data) + } } func TestJetStreamSubscribe_AckPolicy(t *testing.T) {