Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements on raft leader handoff. #4142

Merged
merged 2 commits into from May 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 7 additions & 1 deletion server/jetstream_cluster.go
Expand Up @@ -2376,7 +2376,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
ci := js.clusterInfo(rg)
mset.checkClusterInfo(ci)

newPeers, _, newPeerSet, oldPeerSet := genPeerInfo(rg.Peers, len(rg.Peers)-replicas)
newPeers, oldPeers, newPeerSet, oldPeerSet := genPeerInfo(rg.Peers, len(rg.Peers)-replicas)

// If we are part of the new peerset and we have been passed the baton.
// We will handle scale down.
Expand All @@ -2402,6 +2402,11 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
continue
}

// We are good to go, can scale down here.
for _, p := range oldPeers {
n.ProposeRemovePeer(p)
}

csa := sa.copyGroup()
csa.Group.Peers = newPeers
csa.Group.Preferred = ourPeerId
Expand All @@ -2425,6 +2430,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// Check if we have a quorom.
if current >= neededCurrent {
s.Noticef("Transfer of stream leader for '%s > %s' to '%s'", accName, sa.Config.Name, newLeader)
n.UpdateKnownPeers(newPeers)
n.StepDown(newLeaderPeer)
}
}
Expand Down
11 changes: 10 additions & 1 deletion server/raft.go
Expand Up @@ -1364,7 +1364,15 @@ func (n *raft) StepDown(preferred ...string) error {
if maybeLeader != noLeader {
n.debug("Selected %q for new leader", maybeLeader)
prop.push(newEntry(EntryLeaderTransfer, []byte(maybeLeader)))
time.AfterFunc(250*time.Millisecond, func() { stepdown.push(noLeader) })
time.AfterFunc(250*time.Millisecond, func() {
n.RLock()
stillLeader := n.state == Leader
n.RUnlock()
// If we are still the leader force a stepdown.
if stillLeader {
stepdown.push(noLeader)
}
})
} else {
// Force us to stepdown here.
n.debug("Stepping down")
Expand Down Expand Up @@ -1402,6 +1410,7 @@ func (n *raft) campaign() error {
func (n *raft) xferCampaign() error {
n.debug("Starting transfer campaign")
if n.state == Leader {
n.lxfer = false
return errAlreadyLeader
}
n.resetElect(10 * time.Millisecond)
Expand Down