Skip to content

Commit

Permalink
Improvements on raft leader handoff. (#4142)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed May 10, 2023
2 parents 6e6ce3a + b9af0d0 commit b951cd1
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 2 deletions.
8 changes: 7 additions & 1 deletion server/jetstream_cluster.go
Expand Up @@ -2376,7 +2376,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
ci := js.clusterInfo(rg)
mset.checkClusterInfo(ci)

newPeers, _, newPeerSet, oldPeerSet := genPeerInfo(rg.Peers, len(rg.Peers)-replicas)
newPeers, oldPeers, newPeerSet, oldPeerSet := genPeerInfo(rg.Peers, len(rg.Peers)-replicas)

// If we are part of the new peerset and we have been passed the baton.
// We will handle scale down.
Expand All @@ -2402,6 +2402,11 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
continue
}

// We are good to go, can scale down here.
for _, p := range oldPeers {
n.ProposeRemovePeer(p)
}

csa := sa.copyGroup()
csa.Group.Peers = newPeers
csa.Group.Preferred = ourPeerId
Expand All @@ -2425,6 +2430,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// Check if we have a quorom.
if current >= neededCurrent {
s.Noticef("Transfer of stream leader for '%s > %s' to '%s'", accName, sa.Config.Name, newLeader)
n.UpdateKnownPeers(newPeers)
n.StepDown(newLeaderPeer)
}
}
Expand Down
11 changes: 10 additions & 1 deletion server/raft.go
Expand Up @@ -1364,7 +1364,15 @@ func (n *raft) StepDown(preferred ...string) error {
if maybeLeader != noLeader {
n.debug("Selected %q for new leader", maybeLeader)
prop.push(newEntry(EntryLeaderTransfer, []byte(maybeLeader)))
time.AfterFunc(250*time.Millisecond, func() { stepdown.push(noLeader) })
time.AfterFunc(250*time.Millisecond, func() {
n.RLock()
stillLeader := n.state == Leader
n.RUnlock()
// If we are still the leader force a stepdown.
if stillLeader {
stepdown.push(noLeader)
}
})
} else {
// Force us to stepdown here.
n.debug("Stepping down")
Expand Down Expand Up @@ -1402,6 +1410,7 @@ func (n *raft) campaign() error {
func (n *raft) xferCampaign() error {
n.debug("Starting transfer campaign")
if n.state == Leader {
n.lxfer = false
return errAlreadyLeader
}
n.resetElect(10 * time.Millisecond)
Expand Down

0 comments on commit b951cd1

Please sign in to comment.