From fcc7c443247aaa5aad61c33b076259d65ab1e042 Mon Sep 17 00:00:00 2001 From: Deepak Sah Date: Fri, 29 Jul 2022 23:37:11 +0530 Subject: [PATCH] Allow setting consumer replicas though options (#1019) [ADDED] Allow setting consumer replicas though options --- example_test.go | 5 +++++ js.go | 14 ++++++++++++++ test/js_test.go | 33 +++++++++++++++++++++++++++++++++ 3 files changed, 52 insertions(+) diff --git a/example_test.go b/example_test.go index 08616a73d..42e3f322d 100644 --- a/example_test.go +++ b/example_test.go @@ -625,6 +625,11 @@ func ExampleSubOpt() { js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.ManualAck(), nats.MaxDeliver(2), nats.BackOff([]time.Duration{50 * time.Millisecond, 250 * time.Millisecond})) + + // Set consumer replicas count for a durable while subscribing. + js.Subscribe("foo", func(msg *nats.Msg) { + fmt.Printf("Received a message: %s\n", string(msg.Data)) + }, nats.Durable("FOO"), nats.ConsumerReplicas(1)) } func ExampleMaxWait() { diff --git a/js.go b/js.go index f3c429340..5ea6c9fb9 100644 --- a/js.go +++ b/js.go @@ -1332,6 +1332,9 @@ func checkConfig(s, u *ConsumerConfig) error { if u.Heartbeat > 0 && u.Heartbeat != s.Heartbeat { return makeErr("heartbeat", u.Heartbeat, s.Heartbeat) } + if u.Replicas > 0 && u.Replicas != s.Replicas { + return makeErr("replicas", u.Replicas, s.Replicas) + } return nil } @@ -2449,6 +2452,17 @@ func InactiveThreshold(threshold time.Duration) SubOpt { }) } +// ConsumerReplicas sets the number of replica count for a consumer. +func ConsumerReplicas(replicas int) SubOpt { + return subOptFn(func(opts *subOpts) error { + if replicas < 1 { + return fmt.Errorf("invalid ConsumerReplicas value (%v), needs to be greater than 0", replicas) + } + opts.cfg.Replicas = replicas + return nil + }) +} + func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) { sub.mu.Lock() // TODO(dlc) - Better way to mark especially if we attach. diff --git a/test/js_test.go b/test/js_test.go index a5482d657..7aec29d5c 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -7272,3 +7272,36 @@ func TestJetStreamDirectGetMsg(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } } + +func TestJetStreamConsumerReplicasOption(t *testing.T) { + withJSCluster(t, "CR", 3, func(t *testing.T, nodes ...*jsServer) { + nc, js := jsClient(t, nodes[0].Server) + defer nc.Close() + + if _, err := js.AddStream(&nats.StreamConfig{ + Name: "ConsumerReplicasTest", + Subjects: []string{"foo"}, + Replicas: 3, + }); err != nil { + t.Fatalf("Error adding stream: %v", err) + } + + // Subscribe to the stream with a durable consumer "bar" and replica set to 1. + cb := func(msg *nats.Msg) {} + _, err := js.Subscribe("foo", cb, nats.Durable("bar"), nats.ConsumerReplicas(1)) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + + // Get consumer info + consInfo, err := js.ConsumerInfo("ConsumerReplicasTest", "bar") + if err != nil { + t.Fatalf("Error getting consumer info: %v", err) + } + + // Check if the number of replicas is the same as we provided. + if consInfo.Config.Replicas != 1 { + t.Fatalf("Expected consumer replica to be %v, got %+v", 1, consInfo.Config.Replicas) + } + }) +}