Skip to content

Commit

Permalink
Stream migration update (#4104)
Browse files Browse the repository at this point in the history
I noticed that stream migration could be delayed due to transferring
leadership while the new leader was still paused for a upper layer
catchup, resulting in downgrading to a normal lost quorum vote. This
allows a leadership transfer to move ahead once the upper layer resumes.
Also check quicker but slow down if the state we need to have is not
there yet.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Apr 26, 2023
2 parents 08d3418 + 83293f8 commit aea4a41
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 15 deletions.
32 changes: 21 additions & 11 deletions server/jetstream_cluster.go
Expand Up @@ -2013,8 +2013,18 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps

startMigrationMonitoring := func() {
if mmt == nil {
mmt = time.NewTicker(500 * time.Millisecond)
mmt = time.NewTicker(10 * time.Millisecond)
mmtc = mmt.C
}
}

adjustMigrationMonitoring := func() {
const delay = 500 * time.Millisecond
if mmt == nil {
mmt = time.NewTicker(delay)
mmtc = mmt.C
} else {
mmt.Reset(delay)
}
}

Expand Down Expand Up @@ -2242,8 +2252,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps

// Check to see where we are..
rg := mset.raftGroup()
ci := js.clusterInfo(rg)
mset.checkClusterInfo(ci)

// Track the new peers and check the ones that are current.
mset.mu.RLock()
Expand All @@ -2255,7 +2263,14 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
continue
}

newPeers, oldPeers, newPeerSet, oldPeerSet := genPeerInfo(rg.Peers, len(rg.Peers)-replicas)
// Adjust to our normal time delay.
adjustMigrationMonitoring()

// Make sure we have correct cluster information on the other peers.
ci := js.clusterInfo(rg)
mset.checkClusterInfo(ci)

newPeers, _, newPeerSet, oldPeerSet := genPeerInfo(rg.Peers, len(rg.Peers)-replicas)

// If we are part of the new peerset and we have been passed the baton.
// We will handle scale down.
Expand All @@ -2281,23 +2296,18 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
continue
}

// We are good to go, can scale down here.
for _, p := range oldPeers {
n.ProposeRemovePeer(p)
}

csa := sa.copyGroup()
csa.Group.Peers = newPeers
csa.Group.Preferred = ourPeerId
csa.Group.Cluster = s.cachedClusterName()
cc.meta.ForwardProposal(encodeUpdateStreamAssignment(csa))
s.Noticef("Scaling down '%s > %s' to %+v", accName, sa.Config.Name, s.peerSetToNames(newPeers))

} else {
// We are the old leader here, from the original peer set.
// We are simply waiting on the new peerset to be caught up so we can transfer leadership.
var newLeaderPeer, newLeader string
neededCurrent, current := replicas/2+1, 0

for _, r := range ci.Replicas {
if r.Current && newPeerSet[r.Peer] {
current++
Expand Down Expand Up @@ -6877,7 +6887,7 @@ func encodeStreamMsg(subject, reply string, hdr, msg []byte, lseq uint64, ts int

// Threshold for compression.
// TODO(dlc) - Eventually make configurable.
const compressThreshold = 4 * 1024
const compressThreshold = 256

// If allowed and contents over the threshold we will compress.
func encodeStreamMsgAllowCompress(subject, reply string, hdr, msg []byte, lseq uint64, ts int64, compressOK bool) []byte {
Expand Down
21 changes: 17 additions & 4 deletions server/raft.go
Expand Up @@ -855,7 +855,13 @@ func (n *raft) ResumeApply() {
}
}
n.hcommit = 0
n.resetElectionTimeout()

// If we had been selected to be the next leader campaign here now that we have resumed.
if n.lxfer {
n.xferCampaign()
} else {
n.resetElectionTimeout()
}
}

// Applied is to be called when the FSM has applied the committed entries.
Expand Down Expand Up @@ -3158,9 +3164,16 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
// Only process these if they are new, so no replays or catchups.
if isNew {
maybeLeader := string(e.Data)
if maybeLeader == n.id && !n.observer && !n.paused {
n.lxfer = true
n.xferCampaign()
// This is us. We need to check if we can become the leader.
if maybeLeader == n.id {
// If not an observer and not paused we are good to go.
if !n.observer && !n.paused {
n.lxfer = true
n.xferCampaign()
} else if n.paused && !n.pobserver {
// Here we can become a leader but need to wait for resume of the apply channel.
n.lxfer = true
}
}
}
case EntryAddPeer:
Expand Down

0 comments on commit aea4a41

Please sign in to comment.