Skip to content

Commit

Permalink
Tweak consumer replica scaling (#4404)
Browse files Browse the repository at this point in the history
This should hopefully catch some consumer scaling situations more
reliably, including cases where the consumer filter subjects no longer
match those of the stream after being scaled down to R1 or after a
cluster restart. I've also added a test to test whether filtered
consumers will scale properly even when the stream subject orphans them.

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Aug 18, 2023
2 parents 1e87c3d + c437157 commit ff688ab
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 10 deletions.
8 changes: 4 additions & 4 deletions server/consumer.go
Expand Up @@ -213,14 +213,14 @@ var (

// Calculate accurate replicas for the consumer config with the parent stream config.
func (consCfg ConsumerConfig) replicas(strCfg *StreamConfig) int {
if consCfg.Replicas == 0 {
if !isDurableConsumer(&consCfg) && strCfg.Retention == LimitsPolicy {
if consCfg.Replicas == 0 || consCfg.Replicas > strCfg.Replicas {
if !isDurableConsumer(&consCfg) && strCfg.Retention == LimitsPolicy && consCfg.Replicas == 0 {
// Matches old-school ephemerals only, where the replica count is 0.
return 1
}
return strCfg.Replicas
} else {
return consCfg.Replicas
}
return consCfg.Replicas
}

// Consumer is a jetstream consumer.
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster.go
Expand Up @@ -4001,7 +4001,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
var didCreate, isConfigUpdate, needsLocalResponse bool
if o == nil {
// Add in the consumer if needed.
if o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, false); err == nil {
if o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, wasExisting); err == nil {
didCreate = true
}
} else {
Expand Down
41 changes: 41 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -4985,3 +4985,44 @@ func TestJetStreamClusterStreamFailTrackingSnapshots(t *testing.T) {
t.Fatalf("Expected no errors, got %d", len(errCh))
}
}

func TestJetStreamClusterOrphanConsumerSubjects(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.>", "bar.>"},
Replicas: 3,
})
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Name: "consumer_foo",
Durable: "consumer_foo",
FilterSubject: "foo.something",
})
require_NoError(t, err)

for _, replicas := range []int{3, 1, 3} {
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"bar.>"},
Replicas: replicas,
})
require_NoError(t, err)
c.waitOnAllCurrent()
}

c.waitOnStreamLeader("$G", "TEST")
c.waitOnConsumerLeader("$G", "TEST", "consumer_foo")

info, err := js.ConsumerInfo("TEST", "consumer_foo")
require_NoError(t, err)
require_True(t, info.Cluster != nil)
require_NotEqual(t, info.Cluster.Leader, "")
require_Equal(t, len(info.Cluster.Replicas), 2)
}
9 changes: 4 additions & 5 deletions server/test_test.go
Expand Up @@ -14,7 +14,6 @@
package server

import (
"bytes"
"fmt"
"math/rand"
"net/url"
Expand Down Expand Up @@ -112,16 +111,16 @@ func require_Error(t *testing.T, err error, expected ...error) {
t.Fatalf("Expected one of %v, got '%v'", expected, err)
}

func require_Equal(t *testing.T, a, b string) {
func require_Equal[T comparable](t *testing.T, a, b T) {
t.Helper()
if strings.Compare(a, b) != 0 {
if a != b {
t.Fatalf("require equal, but got: %v != %v", a, b)
}
}

func require_NotEqual(t *testing.T, a, b [32]byte) {
func require_NotEqual[T comparable](t *testing.T, a, b T) {
t.Helper()
if bytes.Equal(a[:], b[:]) {
if a == b {
t.Fatalf("require not equal, but got: %v != %v", a, b)
}
}
Expand Down

0 comments on commit ff688ab

Please sign in to comment.