Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Can't scale up some older streams #4146

Merged
merged 1 commit into from May 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions server/jetstream_cluster.go
Expand Up @@ -5868,6 +5868,18 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
if isReplicaChange {
// We are adding new peers here.
if newCfg.Replicas > len(rg.Peers) {
// Check if we do not have a cluster assigned, and if we do not make sure we
// try to pick one. This could happen with older streams that were assigned by
// previous servers.
if rg.Cluster == _EMPTY_ {
// Prefer placement directrives if we have them.
if newCfg.Placement != nil && newCfg.Placement.Cluster != _EMPTY_ {
rg.Cluster = newCfg.Placement.Cluster
} else {
// Fall back to the cluster assignment from the client.
rg.Cluster = ci.Cluster
}
}
peers, err := cc.selectPeerGroup(newCfg.Replicas, rg.Cluster, newCfg, rg.Peers, 0, nil)
if err != nil {
resp.Error = NewJSClusterNoPeersError(err)
Expand Down
49 changes: 49 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -3997,3 +3997,52 @@ func TestJetStreamClusterStreamAccountingDriftFixups(t *testing.T) {
require_NoError(t, err)
require_True(t, jsz.JetStreamStats.Store == 0)
}

// Some older streams seem to have been created or exist with no explicit cluster setting.
// For server <= 2.9.16 you could not scale the streams up since we could not place them in another cluster.
func TestJetStreamClusterStreamScaleUpNoGroupCluster(t *testing.T) {
c := createJetStreamClusterExplicit(t, "NATS", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
})
require_NoError(t, err)

// Manually going to grab stream assignment and update it to be without the group cluster.
s := c.streamLeader(globalAccountName, "TEST")
mset, err := s.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)

sa := mset.streamAssignment()
require_NotNil(t, sa)
// Make copy to not change stream's
sa = sa.copyGroup()
// Remove cluster and preferred.
sa.Group.Cluster = _EMPTY_
sa.Group.Preferred = _EMPTY_
// Insert into meta layer.
s.mu.RLock()
s.js.cluster.meta.ForwardProposal(encodeUpdateStreamAssignment(sa))
s.mu.RUnlock()
// Make sure it got propagated..
checkFor(t, 10*time.Second, 200*time.Millisecond, func() error {
sa := mset.streamAssignment().copyGroup()
require_NotNil(t, sa)
if sa.Group.Cluster != _EMPTY_ {
return fmt.Errorf("Cluster still not cleared")
}
return nil
})
// Now we know it has been nil'd out. Make sure we can scale up.
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
Replicas: 3,
})
require_NoError(t, err)
}