Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tweak consumer replica scaling #4404

Merged
merged 3 commits into from Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions server/consumer.go
Expand Up @@ -213,14 +213,14 @@ var (

// Calculate accurate replicas for the consumer config with the parent stream config.
func (consCfg ConsumerConfig) replicas(strCfg *StreamConfig) int {
if consCfg.Replicas == 0 {
if !isDurableConsumer(&consCfg) && strCfg.Retention == LimitsPolicy {
if consCfg.Replicas == 0 || consCfg.Replicas > strCfg.Replicas {
if !isDurableConsumer(&consCfg) && strCfg.Retention == LimitsPolicy && consCfg.Replicas == 0 {
// Matches old-school ephemerals only, where the replica count is 0.
return 1
neilalexander marked this conversation as resolved.
Show resolved Hide resolved
}
return strCfg.Replicas
} else {
return consCfg.Replicas
}
return consCfg.Replicas
}

// Consumer is a jetstream consumer.
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster.go
Expand Up @@ -4001,7 +4001,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
var didCreate, isConfigUpdate, needsLocalResponse bool
if o == nil {
// Add in the consumer if needed.
if o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, false); err == nil {
if o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, wasExisting); err == nil {
didCreate = true
}
} else {
Expand Down
41 changes: 41 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -4985,3 +4985,44 @@ func TestJetStreamClusterStreamFailTrackingSnapshots(t *testing.T) {
t.Fatalf("Expected no errors, got %d", len(errCh))
}
}

func TestJetStreamClusterOrphanConsumerSubjects(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.>", "bar.>"},
Replicas: 3,
})
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Name: "consumer_foo",
Durable: "consumer_foo",
FilterSubject: "foo.something",
})
require_NoError(t, err)

for _, replicas := range []int{3, 1, 3} {
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"bar.>"},
Replicas: replicas,
})
require_NoError(t, err)
c.waitOnAllCurrent()
}

c.waitOnStreamLeader("$G", "TEST")
c.waitOnConsumerLeader("$G", "TEST", "consumer_foo")

info, err := js.ConsumerInfo("TEST", "consumer_foo")
require_NoError(t, err)
require_True(t, info.Cluster != nil)
require_NotEqual(t, info.Cluster.Leader, "")
require_Equal(t, len(info.Cluster.Replicas), 2)
}
9 changes: 4 additions & 5 deletions server/test_test.go
Expand Up @@ -14,7 +14,6 @@
package server

import (
"bytes"
"fmt"
"math/rand"
"net/url"
Expand Down Expand Up @@ -112,16 +111,16 @@ func require_Error(t *testing.T, err error, expected ...error) {
t.Fatalf("Expected one of %v, got '%v'", expected, err)
}

func require_Equal(t *testing.T, a, b string) {
func require_Equal[T comparable](t *testing.T, a, b T) {
t.Helper()
if strings.Compare(a, b) != 0 {
if a != b {
t.Fatalf("require equal, but got: %v != %v", a, b)
}
}

func require_NotEqual(t *testing.T, a, b [32]byte) {
func require_NotEqual[T comparable](t *testing.T, a, b T) {
t.Helper()
if bytes.Equal(a[:], b[:]) {
if a == b {
t.Fatalf("require not equal, but got: %v != %v", a, b)
}
}
Expand Down