Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] When becoming meta-leader optionally send current snapshot to followers. #3904

Merged
merged 1 commit into from Feb 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
132 changes: 71 additions & 61 deletions server/jetstream_cluster.go
Expand Up @@ -127,9 +127,10 @@ type streamAssignment struct {
Reply string `json:"reply"`
Restore *StreamState `json:"restore_state,omitempty"`
// Internal
consumers map[string]*consumerAssignment
responded bool
err error
consumers map[string]*consumerAssignment
responded bool
recovering bool
err error
}

// consumerAssignment is what the meta controller uses to assign consumers to streams.
Expand All @@ -144,9 +145,10 @@ type consumerAssignment struct {
Reply string `json:"reply"`
State *ConsumerState `json:"state,omitempty"`
// Internal
responded bool
deleted bool
err error
responded bool
recovering bool
deleted bool
err error
}

// streamPurge is what the stream leader will replicate when purging a stream.
Expand Down Expand Up @@ -987,7 +989,6 @@ func (js *jetStream) monitorCluster() {
isLeader bool
lastSnap []byte
lastSnapTime time.Time
beenLeader bool
)

// Highwayhash key for generating hashes.
Expand All @@ -998,17 +999,21 @@ func (js *jetStream) monitorCluster() {
js.setMetaRecovering()

// Snapshotting function.
doSnapshot := func() {
doSnapshot := func() []byte {
// Suppress during recovery.
if js.isMetaRecovering() {
return
return nil
}
snap := js.metaSnapshot()
if hash := highwayhash.Sum(snap, key); !bytes.Equal(hash[:], lastSnap) {
if err := n.InstallSnapshot(snap); err == nil {
lastSnap, lastSnapTime = hash[:], time.Now()
return snap
} else {
s.Warnf("Error snapshotting JetStream cluster state: %v", err)
}
}
return nil
}

ru := &recoveryUpdates{
Expand Down Expand Up @@ -1059,6 +1064,15 @@ func (js *jetStream) monitorCluster() {
// FIXME(dlc) - Deal with errors.
if didSnap, didRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil {
_, nb := n.Applied(ce.Index)
// If we processed a snapshot and are recovering remove our pending state.
if didSnap && js.isMetaRecovering() {
ru = &recoveryUpdates{
removeStreams: make(map[string]*streamAssignment),
removeConsumers: make(map[string]*consumerAssignment),
updateStreams: make(map[string]*streamAssignment),
updateConsumers: make(map[string]*consumerAssignment),
}
}
if js.hasPeerEntries(ce.Entries) || didSnap || didRemoval {
// Since we received one make sure we have our own since we do not store
// our meta state outside of raft.
Expand All @@ -1070,16 +1084,16 @@ func (js *jetStream) monitorCluster() {
}
aq.recycle(&ces)
case isLeader = <-lch:
// We want to make sure we are updated on statsz so ping the extended cluster.
js.processLeaderChange(isLeader)

if isLeader {
s.sendInternalMsgLocked(serverStatsPingReqSubj, _EMPTY_, nil, nil)
}
js.processLeaderChange(isLeader)
if isLeader && !beenLeader {
beenLeader = true
if n.NeedSnapshot() {
if err := n.InstallSnapshot(js.metaSnapshot()); err != nil {
s.Warnf("Error snapshotting JetStream cluster state: %v", err)
// Install a snapshot as we become leader. We will also send to the cluster.
if snap := doSnapshot(); snap != nil {
// If we are caught up distribute our current state to followers.
if ne, _ := n.Size(); ne == 0 {
// Send our snapshot to others to make sure all in sync.
n.SendSnapshot(snap)
}
}
js.checkClusterSize()
Expand Down Expand Up @@ -1279,51 +1293,38 @@ func (js *jetStream) applyMetaSnapshot(buf []byte) error {
}
}
}
isRecovering := js.metaRecovering
js.mu.Unlock()

// Do removals first.
for _, sa := range saDel {
if isRecovering {
js.setStreamAssignmentRecovering(sa)
}
js.setStreamAssignmentRecovering(sa)
js.processStreamRemoval(sa)
}
// Now do add for the streams. Also add in all consumers.
for _, sa := range saAdd {
if isRecovering {
js.setStreamAssignmentRecovering(sa)
}
js.setStreamAssignmentRecovering(sa)
js.processStreamAssignment(sa)
// We can simply add the consumers.
for _, ca := range sa.consumers {
if isRecovering {
js.setConsumerAssignmentRecovering(ca)
}
js.setConsumerAssignmentRecovering(ca)
js.processConsumerAssignment(ca)
}
}

// Perform updates on those in saChk. These were existing so make
// sure to process any changes.
for _, sa := range saChk {
if isRecovering {
js.setStreamAssignmentRecovering(sa)
}
js.setStreamAssignmentRecovering(sa)
js.processUpdateStreamAssignment(sa)
}

// Now do the deltas for existing stream's consumers.
for _, ca := range caDel {
if isRecovering {
js.setConsumerAssignmentRecovering(ca)
}
js.setConsumerAssignmentRecovering(ca)
js.processConsumerRemoval(ca)
}
for _, ca := range caAdd {
if isRecovering {
js.setConsumerAssignmentRecovering(ca)
}
js.setConsumerAssignmentRecovering(ca)
js.processConsumerAssignment(ca)
}

Expand All @@ -1335,6 +1336,7 @@ func (js *jetStream) setStreamAssignmentRecovering(sa *streamAssignment) {
js.mu.Lock()
defer js.mu.Unlock()
sa.responded = true
sa.recovering = true
sa.Restore = nil
if sa.Group != nil {
sa.Group.Preferred = _EMPTY_
Expand All @@ -1346,6 +1348,7 @@ func (js *jetStream) setConsumerAssignmentRecovering(ca *consumerAssignment) {
js.mu.Lock()
defer js.mu.Unlock()
ca.responded = true
ca.recovering = true
if ca.Group != nil {
ca.Group.Preferred = _EMPTY_
}
Expand Down Expand Up @@ -2977,6 +2980,7 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
storage, cfg := sa.Config.Storage, sa.Config
hasResponded := sa.responded
sa.responded = true
recovering := sa.recovering
js.mu.Unlock()

mset, err := acc.lookupStream(cfg.Name)
Expand All @@ -3002,7 +3006,7 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
js.mu.Unlock()
}
// Call update.
if err = mset.update(cfg); err != nil {
if err = mset.updateWithAdvisory(cfg, !recovering); err != nil {
s.Warnf("JetStream cluster error updating stream %q for account %q: %v", cfg.Name, acc.Name, err)
}
// Set the new stream assignment.
Expand Down Expand Up @@ -3036,9 +3040,7 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
return
}

mset.mu.RLock()
isLeader := mset.isLeader()
mset.mu.RUnlock()
isLeader := mset.IsLeader()

// Check for missing syncSubject bug.
if isLeader && osa != nil && osa.Sync == _EMPTY_ {
Expand All @@ -3054,7 +3056,7 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
}

// Check if we should bail.
if !isLeader || hasResponded {
if !isLeader || hasResponded || recovering {
return
}

Expand Down Expand Up @@ -3114,19 +3116,21 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme
if sa.Group.Name == osa.Group.Name && reflect.DeepEqual(sa.Group.Peers, osa.Group.Peers) {
// Since this already exists we know it succeeded, just respond to this caller.
js.mu.RLock()
client, subject, reply := sa.Client, sa.Subject, sa.Reply
client, subject, reply, recovering := sa.Client, sa.Subject, sa.Reply, sa.recovering
js.mu.RUnlock()

var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Cluster: js.clusterInfo(mset.raftGroup()),
Sources: mset.sourcesInfo(),
Mirror: mset.mirrorInfo(),
if !recovering {
var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Cluster: js.clusterInfo(mset.raftGroup()),
Sources: mset.sourcesInfo(),
Mirror: mset.mirrorInfo(),
}
s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
}
s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
return
} else {
// We had a bug where we could have multiple assignments for the same
Expand Down Expand Up @@ -3342,6 +3346,7 @@ func (js *jetStream) processClusterDeleteStream(sa *streamAssignment, isMember,
if cc := js.cluster; cc != nil {
isMetaLeader = cc.isLeader()
}
recovering := sa.recovering
js.mu.RUnlock()

stopped := false
Expand Down Expand Up @@ -3400,7 +3405,7 @@ func (js *jetStream) processClusterDeleteStream(sa *streamAssignment, isMember,
}

// Do not respond if the account does not exist any longer
if acc == nil {
if acc == nil || recovering {
return
}

Expand Down Expand Up @@ -3653,7 +3658,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
// This consumer exists.
// Only update if config is really different.
cfg := o.config()
if !reflect.DeepEqual(&cfg, ca.Config) {
if isConfigUpdate = !reflect.DeepEqual(&cfg, ca.Config); isConfigUpdate {
// Call into update, ignore consumer exists error here since this means an old deliver subject is bound
// which can happen on restart etc.
if err := o.updateConfig(ca.Config); err != nil && err != NewJSConsumerNameExistError() {
Expand Down Expand Up @@ -3693,7 +3698,9 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
// If we look like we are scaling up, let's send our current state to the group.
sendState = len(ca.Group.Peers) > len(oca.Group.Peers) && o.IsLeader() && n != nil
// Signal that this is an update
isConfigUpdate = true
if ca.Reply != _EMPTY_ {
isConfigUpdate = true
}
}
js.mu.RUnlock()

Expand Down Expand Up @@ -3794,13 +3801,15 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
// For existing consumer, only send response if not recovering.
if wasExisting && !js.isMetaRecovering() {
if o.IsLeader() || (!didCreate && needsLocalResponse) {
// Process if existing as an update.
// Process if existing as an update. Double check that this is not recovered.
js.mu.RLock()
client, subject, reply := ca.Client, ca.Subject, ca.Reply
client, subject, reply, recovering := ca.Client, ca.Subject, ca.Reply, ca.recovering
js.mu.RUnlock()
var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}
resp.ConsumerInfo = o.info()
s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
if !recovering {
var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}
resp.ConsumerInfo = o.info()
s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
}
}
}
}
Expand All @@ -3819,6 +3828,7 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, isMemb
if cc := js.cluster; cc != nil {
isMetaLeader = cc.isLeader()
}
recovering := ca.recovering
js.mu.RUnlock()

stopped := false
Expand Down Expand Up @@ -3856,8 +3866,8 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, isMemb
}
}

// Do not respond if the account does not exist any longer
if acc == nil {
// Do not respond if the account does not exist any longer or this is during recovery.
if acc == nil || recovering {
return
}

Expand Down