Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream migration update #4104

Merged
merged 2 commits into from Apr 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 21 additions & 11 deletions server/jetstream_cluster.go
Expand Up @@ -2013,8 +2013,18 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps

startMigrationMonitoring := func() {
if mmt == nil {
mmt = time.NewTicker(500 * time.Millisecond)
mmt = time.NewTicker(10 * time.Millisecond)
mmtc = mmt.C
}
}

adjustMigrationMonitoring := func() {
const delay = 500 * time.Millisecond
if mmt == nil {
mmt = time.NewTicker(delay)
mmtc = mmt.C
} else {
mmt.Reset(delay)
}
}

Expand Down Expand Up @@ -2242,8 +2252,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps

// Check to see where we are..
rg := mset.raftGroup()
ci := js.clusterInfo(rg)
mset.checkClusterInfo(ci)

// Track the new peers and check the ones that are current.
mset.mu.RLock()
Expand All @@ -2255,7 +2263,14 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
continue
}

newPeers, oldPeers, newPeerSet, oldPeerSet := genPeerInfo(rg.Peers, len(rg.Peers)-replicas)
// Adjust to our normal time delay.
adjustMigrationMonitoring()

// Make sure we have correct cluster information on the other peers.
ci := js.clusterInfo(rg)
mset.checkClusterInfo(ci)

newPeers, _, newPeerSet, oldPeerSet := genPeerInfo(rg.Peers, len(rg.Peers)-replicas)

// If we are part of the new peerset and we have been passed the baton.
// We will handle scale down.
Expand All @@ -2281,23 +2296,18 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
continue
}

// We are good to go, can scale down here.
for _, p := range oldPeers {
n.ProposeRemovePeer(p)
}

csa := sa.copyGroup()
csa.Group.Peers = newPeers
csa.Group.Preferred = ourPeerId
csa.Group.Cluster = s.cachedClusterName()
cc.meta.ForwardProposal(encodeUpdateStreamAssignment(csa))
s.Noticef("Scaling down '%s > %s' to %+v", accName, sa.Config.Name, s.peerSetToNames(newPeers))

} else {
// We are the old leader here, from the original peer set.
// We are simply waiting on the new peerset to be caught up so we can transfer leadership.
var newLeaderPeer, newLeader string
neededCurrent, current := replicas/2+1, 0

for _, r := range ci.Replicas {
if r.Current && newPeerSet[r.Peer] {
current++
Expand Down Expand Up @@ -6877,7 +6887,7 @@ func encodeStreamMsg(subject, reply string, hdr, msg []byte, lseq uint64, ts int

// Threshold for compression.
// TODO(dlc) - Eventually make configurable.
const compressThreshold = 4 * 1024
const compressThreshold = 256

// If allowed and contents over the threshold we will compress.
func encodeStreamMsgAllowCompress(subject, reply string, hdr, msg []byte, lseq uint64, ts int64, compressOK bool) []byte {
Expand Down
21 changes: 17 additions & 4 deletions server/raft.go
Expand Up @@ -855,7 +855,13 @@ func (n *raft) ResumeApply() {
}
}
n.hcommit = 0
n.resetElectionTimeout()

// If we had been selected to be the next leader campaign here now that we have resumed.
if n.lxfer {
n.xferCampaign()
} else {
n.resetElectionTimeout()
}
}

// Applied is to be called when the FSM has applied the committed entries.
Expand Down Expand Up @@ -3158,9 +3164,16 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
// Only process these if they are new, so no replays or catchups.
if isNew {
maybeLeader := string(e.Data)
if maybeLeader == n.id && !n.observer && !n.paused {
n.lxfer = true
n.xferCampaign()
// This is us. We need to check if we can become the leader.
if maybeLeader == n.id {
// If not an observer and not paused we are good to go.
if !n.observer && !n.paused {
n.lxfer = true
n.xferCampaign()
} else if n.paused && !n.pobserver {
// Here we can become a leader but need to wait for resume of the apply channel.
n.lxfer = true
}
}
}
case EntryAddPeer:
Expand Down