From 5240d8e558dfe6342dd03b3a2d536448b071d344 Mon Sep 17 00:00:00 2001 From: Deepak Date: Fri, 22 Jul 2022 12:24:15 +0530 Subject: [PATCH 1/6] Allow setting consumer replicas though options --- example_test.go | 5 +++++ js.go | 11 +++++++++++ 2 files changed, 16 insertions(+) diff --git a/example_test.go b/example_test.go index 08616a73d..42e3f322d 100644 --- a/example_test.go +++ b/example_test.go @@ -625,6 +625,11 @@ func ExampleSubOpt() { js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.ManualAck(), nats.MaxDeliver(2), nats.BackOff([]time.Duration{50 * time.Millisecond, 250 * time.Millisecond})) + + // Set consumer replicas count for a durable while subscribing. + js.Subscribe("foo", func(msg *nats.Msg) { + fmt.Printf("Received a message: %s\n", string(msg.Data)) + }, nats.Durable("FOO"), nats.ConsumerReplicas(1)) } func ExampleMaxWait() { diff --git a/js.go b/js.go index d9df981d8..f7619d55d 100644 --- a/js.go +++ b/js.go @@ -2413,6 +2413,17 @@ func InactiveThreshold(threshold time.Duration) SubOpt { }) } +// ConsumerReplicas sets the number of replica count for a consumer. +func ConsumerReplicas(replicas int) SubOpt { + return subOptFn(func (opts *subOpts) error { + if replicas < 1 { + return fmt.Errorf("invalid ConsumerReplicas value (%v), needs to be greater than 0", replicas) + } + opts.cfg.Replicas = replicas + return nil + }) +} + func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) { sub.mu.Lock() // TODO(dlc) - Better way to mark especially if we attach. From c5c2b8ebd28df5eaf1fd00950b28e9a88098b5ed Mon Sep 17 00:00:00 2001 From: Deepak Date: Fri, 22 Jul 2022 12:33:52 +0530 Subject: [PATCH 2/6] Run go fmt --- js.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/js.go b/js.go index f7619d55d..1c8d33091 100644 --- a/js.go +++ b/js.go @@ -2415,7 +2415,7 @@ func InactiveThreshold(threshold time.Duration) SubOpt { // ConsumerReplicas sets the number of replica count for a consumer. func ConsumerReplicas(replicas int) SubOpt { - return subOptFn(func (opts *subOpts) error { + return subOptFn(func(opts *subOpts) error { if replicas < 1 { return fmt.Errorf("invalid ConsumerReplicas value (%v), needs to be greater than 0", replicas) } From f732c53425f045e47c548758749a05d7cba1e6e9 Mon Sep 17 00:00:00 2001 From: Deepak Date: Thu, 28 Jul 2022 17:42:26 +0530 Subject: [PATCH 3/6] Add test for consumer replica option --- js.go | 3 +++ js_test.go | 30 ++++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/js.go b/js.go index 1c8d33091..29ca1c189 100644 --- a/js.go +++ b/js.go @@ -1299,6 +1299,9 @@ func checkConfig(s, u *ConsumerConfig) error { if u.Heartbeat > 0 && u.Heartbeat != s.Heartbeat { return makeErr("heartbeat", u.Heartbeat, s.Heartbeat) } + if u.Replicas > 0 && u.Replicas != s.Replicas { + return makeErr("replicas", u.Replicas, s.Replicas) + } return nil } diff --git a/js_test.go b/js_test.go index 2cc51ac8c..e8ed5b574 100644 --- a/js_test.go +++ b/js_test.go @@ -1026,3 +1026,33 @@ func TestJetStreamClusterPlacement(t *testing.T) { t.Fatalf("Unexpected tag: %q", v) } } + +func TestConsumerReplicasOption(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + if _, err := js.AddStream(&StreamConfig{Name: "CONSUMER_REPLICAS_TEST", Subjects: []string{"foo"}}); err != nil { + t.Fatalf("Error adding stream: %v", err) + } + + cb := func(msg *Msg) { return } + // Subscribe to the stream with a durable consumer "bar". + _, err := js.Subscribe("foo", cb, Durable("bar"), ConsumerReplicas(1)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Get consumer info + consInfo, err := js.ConsumerInfo("CONSUMER_REPLICAS_TEST", "bar") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Check if the number of replicas is the same as we provided. + if consInfo.Config.Replicas != 1 { + t.Fatalf("Expected consumer replica to be %v, got %+v", 1, consInfo.Config.Replicas) + } +} From 122e3a13b356a1d2cd27ffaeee66cc4c9b4b5eeb Mon Sep 17 00:00:00 2001 From: Deepak Date: Thu, 28 Jul 2022 17:55:16 +0530 Subject: [PATCH 4/6] Remove redundant return statement --- js_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/js_test.go b/js_test.go index e8ed5b574..870a834e9 100644 --- a/js_test.go +++ b/js_test.go @@ -1038,7 +1038,7 @@ func TestConsumerReplicasOption(t *testing.T) { t.Fatalf("Error adding stream: %v", err) } - cb := func(msg *Msg) { return } + cb := func(msg *Msg) {} // Subscribe to the stream with a durable consumer "bar". _, err := js.Subscribe("foo", cb, Durable("bar"), ConsumerReplicas(1)) if err != nil { From 1d9feeb5c599beb272a493011460f815b22fdbbf Mon Sep 17 00:00:00 2001 From: Deepak Date: Fri, 29 Jul 2022 17:37:46 +0530 Subject: [PATCH 5/6] Test consumer replica option with clustered JetStream --- js_test.go | 30 ------------------------------ test/js_test.go | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 30 deletions(-) diff --git a/js_test.go b/js_test.go index 870a834e9..2cc51ac8c 100644 --- a/js_test.go +++ b/js_test.go @@ -1026,33 +1026,3 @@ func TestJetStreamClusterPlacement(t *testing.T) { t.Fatalf("Unexpected tag: %q", v) } } - -func TestConsumerReplicasOption(t *testing.T) { - s := RunBasicJetStreamServer() - defer shutdownJSServerAndRemoveStorage(t, s) - - nc, js := jsClient(t, s) - defer nc.Close() - - if _, err := js.AddStream(&StreamConfig{Name: "CONSUMER_REPLICAS_TEST", Subjects: []string{"foo"}}); err != nil { - t.Fatalf("Error adding stream: %v", err) - } - - cb := func(msg *Msg) {} - // Subscribe to the stream with a durable consumer "bar". - _, err := js.Subscribe("foo", cb, Durable("bar"), ConsumerReplicas(1)) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - // Get consumer info - consInfo, err := js.ConsumerInfo("CONSUMER_REPLICAS_TEST", "bar") - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - // Check if the number of replicas is the same as we provided. - if consInfo.Config.Replicas != 1 { - t.Fatalf("Expected consumer replica to be %v, got %+v", 1, consInfo.Config.Replicas) - } -} diff --git a/test/js_test.go b/test/js_test.go index de2efbf1a..b13c447c0 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -7170,3 +7170,36 @@ func TestJetStreamRePublish(t *testing.T) { lseq[m.Subject] = seq } } + +func TestJetStreamConsumerReplicasOption(t *testing.T) { + withJSCluster(t, "CR", 3, func(t *testing.T, nodes ...*jsServer) { + nc, js := jsClient(t, nodes[0].Server) + defer nc.Close() + + if _, err := js.AddStream(&nats.StreamConfig{ + Name: "ConsumerReplicasTest", + Subjects: []string{"foo"}, + Replicas: 3, + }); err != nil { + t.Fatalf("Error adding stream: %v", err) + } + + // Subscribe to the stream with a durable consumer "bar" and replica set to 1. + cb := func(msg *nats.Msg) {} + _, err := js.Subscribe("foo", cb, nats.Durable("bar"), nats.ConsumerReplicas(1)) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + + // Get consumer info + consInfo, err := js.ConsumerInfo("ConsumerReplicasTest", "bar") + if err != nil { + t.Fatalf("Error getting consumer info: %v", err) + } + + // Check if the number of replicas is the same as we provided. + if consInfo.Config.Replicas != 1 { + t.Fatalf("Expected consumer replica to be %v, got %+v", 1, consInfo.Config.Replicas) + } + }) +} From dbc90caeff2ca8628bb4e98c55590af3b59ebfa7 Mon Sep 17 00:00:00 2001 From: Deepak Date: Fri, 29 Jul 2022 23:11:23 +0530 Subject: [PATCH 6/6] Run go fmt --- test/js_test.go | 42 +++++++++++++++++++++--------------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/test/js_test.go b/test/js_test.go index c6dff245f..7aec29d5c 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -7279,29 +7279,29 @@ func TestJetStreamConsumerReplicasOption(t *testing.T) { defer nc.Close() if _, err := js.AddStream(&nats.StreamConfig{ - Name: "ConsumerReplicasTest", - Subjects: []string{"foo"}, - Replicas: 3, - }); err != nil { - t.Fatalf("Error adding stream: %v", err) - } + Name: "ConsumerReplicasTest", + Subjects: []string{"foo"}, + Replicas: 3, + }); err != nil { + t.Fatalf("Error adding stream: %v", err) + } - // Subscribe to the stream with a durable consumer "bar" and replica set to 1. - cb := func(msg *nats.Msg) {} - _, err := js.Subscribe("foo", cb, nats.Durable("bar"), nats.ConsumerReplicas(1)) - if err != nil { - t.Fatalf("Error on subscribe: %v", err) - } + // Subscribe to the stream with a durable consumer "bar" and replica set to 1. + cb := func(msg *nats.Msg) {} + _, err := js.Subscribe("foo", cb, nats.Durable("bar"), nats.ConsumerReplicas(1)) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } - // Get consumer info - consInfo, err := js.ConsumerInfo("ConsumerReplicasTest", "bar") - if err != nil { - t.Fatalf("Error getting consumer info: %v", err) - } + // Get consumer info + consInfo, err := js.ConsumerInfo("ConsumerReplicasTest", "bar") + if err != nil { + t.Fatalf("Error getting consumer info: %v", err) + } - // Check if the number of replicas is the same as we provided. - if consInfo.Config.Replicas != 1 { - t.Fatalf("Expected consumer replica to be %v, got %+v", 1, consInfo.Config.Replicas) - } + // Check if the number of replicas is the same as we provided. + if consInfo.Config.Replicas != 1 { + t.Fatalf("Expected consumer replica to be %v, got %+v", 1, consInfo.Config.Replicas) + } }) }