Skip to content

Commit

Permalink
[FIXED] Can't scale up some older streams (#4146)
Browse files Browse the repository at this point in the history
For some older R1 streams created by previous servers we could have no
cluster for the stream assignment group which would prevent scale up
with newer servers.

This will inherit cluster if detected as absent from the placement tags
or client cluster designation.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed May 11, 2023
2 parents 2f2498a + 5e029d0 commit bdb0ba9
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 0 deletions.
12 changes: 12 additions & 0 deletions server/jetstream_cluster.go
Expand Up @@ -5868,6 +5868,18 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
if isReplicaChange {
// We are adding new peers here.
if newCfg.Replicas > len(rg.Peers) {
// Check if we do not have a cluster assigned, and if we do not make sure we
// try to pick one. This could happen with older streams that were assigned by
// previous servers.
if rg.Cluster == _EMPTY_ {
// Prefer placement directrives if we have them.
if newCfg.Placement != nil && newCfg.Placement.Cluster != _EMPTY_ {
rg.Cluster = newCfg.Placement.Cluster
} else {
// Fall back to the cluster assignment from the client.
rg.Cluster = ci.Cluster
}
}
peers, err := cc.selectPeerGroup(newCfg.Replicas, rg.Cluster, newCfg, rg.Peers, 0, nil)
if err != nil {
resp.Error = NewJSClusterNoPeersError(err)
Expand Down
49 changes: 49 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -3997,3 +3997,52 @@ func TestJetStreamClusterStreamAccountingDriftFixups(t *testing.T) {
require_NoError(t, err)
require_True(t, jsz.JetStreamStats.Store == 0)
}

// Some older streams seem to have been created or exist with no explicit cluster setting.
// For server <= 2.9.16 you could not scale the streams up since we could not place them in another cluster.
func TestJetStreamClusterStreamScaleUpNoGroupCluster(t *testing.T) {
c := createJetStreamClusterExplicit(t, "NATS", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
})
require_NoError(t, err)

// Manually going to grab stream assignment and update it to be without the group cluster.
s := c.streamLeader(globalAccountName, "TEST")
mset, err := s.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)

sa := mset.streamAssignment()
require_NotNil(t, sa)
// Make copy to not change stream's
sa = sa.copyGroup()
// Remove cluster and preferred.
sa.Group.Cluster = _EMPTY_
sa.Group.Preferred = _EMPTY_
// Insert into meta layer.
s.mu.RLock()
s.js.cluster.meta.ForwardProposal(encodeUpdateStreamAssignment(sa))
s.mu.RUnlock()
// Make sure it got propagated..
checkFor(t, 10*time.Second, 200*time.Millisecond, func() error {
sa := mset.streamAssignment().copyGroup()
require_NotNil(t, sa)
if sa.Group.Cluster != _EMPTY_ {
return fmt.Errorf("Cluster still not cleared")
}
return nil
})
// Now we know it has been nil'd out. Make sure we can scale up.
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
Replicas: 3,
})
require_NoError(t, err)
}

0 comments on commit bdb0ba9

Please sign in to comment.