Skip to content

Commit

Permalink
When becoming leader optionally send current snapshot to followers if…
Browse files Browse the repository at this point in the history
… caught up.

This can help sync on restarts and improve ghost ephemerals. Also added more code to suppress respnses and API audits when we know we are recovering.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Feb 23, 2023
1 parent f2c5e7e commit d347cb1
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 61 deletions.
132 changes: 71 additions & 61 deletions server/jetstream_cluster.go
Expand Up @@ -127,9 +127,10 @@ type streamAssignment struct {
Reply string `json:"reply"`
Restore *StreamState `json:"restore_state,omitempty"`
// Internal
consumers map[string]*consumerAssignment
responded bool
err error
consumers map[string]*consumerAssignment
responded bool
recovering bool
err error
}

// consumerAssignment is what the meta controller uses to assign consumers to streams.
Expand All @@ -144,9 +145,10 @@ type consumerAssignment struct {
Reply string `json:"reply"`
State *ConsumerState `json:"state,omitempty"`
// Internal
responded bool
deleted bool
err error
responded bool
recovering bool
deleted bool
err error
}

// streamPurge is what the stream leader will replicate when purging a stream.
Expand Down Expand Up @@ -987,7 +989,6 @@ func (js *jetStream) monitorCluster() {
isLeader bool
lastSnap []byte
lastSnapTime time.Time
beenLeader bool
)

// Highwayhash key for generating hashes.
Expand All @@ -998,17 +999,21 @@ func (js *jetStream) monitorCluster() {
js.setMetaRecovering()

// Snapshotting function.
doSnapshot := func() {
doSnapshot := func() []byte {
// Suppress during recovery.
if js.isMetaRecovering() {
return
return nil
}
snap := js.metaSnapshot()
if hash := highwayhash.Sum(snap, key); !bytes.Equal(hash[:], lastSnap) {
if err := n.InstallSnapshot(snap); err == nil {
lastSnap, lastSnapTime = hash[:], time.Now()
return snap
} else {
s.Warnf("Error snapshotting JetStream cluster state: %v", err)
}
}
return nil
}

ru := &recoveryUpdates{
Expand Down Expand Up @@ -1059,6 +1064,15 @@ func (js *jetStream) monitorCluster() {
// FIXME(dlc) - Deal with errors.
if didSnap, didRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil {
_, nb := n.Applied(ce.Index)
// If we processed a snapshot and are recovering remove our pending state.
if didSnap && js.isMetaRecovering() {
ru = &recoveryUpdates{
removeStreams: make(map[string]*streamAssignment),
removeConsumers: make(map[string]*consumerAssignment),
updateStreams: make(map[string]*streamAssignment),
updateConsumers: make(map[string]*consumerAssignment),
}
}
if js.hasPeerEntries(ce.Entries) || didSnap || didRemoval {
// Since we received one make sure we have our own since we do not store
// our meta state outside of raft.
Expand All @@ -1070,16 +1084,16 @@ func (js *jetStream) monitorCluster() {
}
aq.recycle(&ces)
case isLeader = <-lch:
// We want to make sure we are updated on statsz so ping the extended cluster.
js.processLeaderChange(isLeader)

if isLeader {
s.sendInternalMsgLocked(serverStatsPingReqSubj, _EMPTY_, nil, nil)
}
js.processLeaderChange(isLeader)
if isLeader && !beenLeader {
beenLeader = true
if n.NeedSnapshot() {
if err := n.InstallSnapshot(js.metaSnapshot()); err != nil {
s.Warnf("Error snapshotting JetStream cluster state: %v", err)
// Install a snapshot as we become leader. We will also send to the cluster.
if snap := doSnapshot(); snap != nil {
// If we are caught up distribute our current state to followers.
if ne, _ := n.Size(); ne == 0 {
// Send our snapshot to others to make sure all in sync.
n.SendSnapshot(snap)
}
}
js.checkClusterSize()
Expand Down Expand Up @@ -1279,51 +1293,38 @@ func (js *jetStream) applyMetaSnapshot(buf []byte) error {
}
}
}
isRecovering := js.metaRecovering
js.mu.Unlock()

// Do removals first.
for _, sa := range saDel {
if isRecovering {
js.setStreamAssignmentRecovering(sa)
}
js.setStreamAssignmentRecovering(sa)
js.processStreamRemoval(sa)
}
// Now do add for the streams. Also add in all consumers.
for _, sa := range saAdd {
if isRecovering {
js.setStreamAssignmentRecovering(sa)
}
js.setStreamAssignmentRecovering(sa)
js.processStreamAssignment(sa)
// We can simply add the consumers.
for _, ca := range sa.consumers {
if isRecovering {
js.setConsumerAssignmentRecovering(ca)
}
js.setConsumerAssignmentRecovering(ca)
js.processConsumerAssignment(ca)
}
}

// Perform updates on those in saChk. These were existing so make
// sure to process any changes.
for _, sa := range saChk {
if isRecovering {
js.setStreamAssignmentRecovering(sa)
}
js.setStreamAssignmentRecovering(sa)
js.processUpdateStreamAssignment(sa)
}

// Now do the deltas for existing stream's consumers.
for _, ca := range caDel {
if isRecovering {
js.setConsumerAssignmentRecovering(ca)
}
js.setConsumerAssignmentRecovering(ca)
js.processConsumerRemoval(ca)
}
for _, ca := range caAdd {
if isRecovering {
js.setConsumerAssignmentRecovering(ca)
}
js.setConsumerAssignmentRecovering(ca)
js.processConsumerAssignment(ca)
}

Expand All @@ -1335,6 +1336,7 @@ func (js *jetStream) setStreamAssignmentRecovering(sa *streamAssignment) {
js.mu.Lock()
defer js.mu.Unlock()
sa.responded = true
sa.recovering = true
sa.Restore = nil
if sa.Group != nil {
sa.Group.Preferred = _EMPTY_
Expand All @@ -1346,6 +1348,7 @@ func (js *jetStream) setConsumerAssignmentRecovering(ca *consumerAssignment) {
js.mu.Lock()
defer js.mu.Unlock()
ca.responded = true
ca.recovering = true
if ca.Group != nil {
ca.Group.Preferred = _EMPTY_
}
Expand Down Expand Up @@ -2977,6 +2980,7 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
storage, cfg := sa.Config.Storage, sa.Config
hasResponded := sa.responded
sa.responded = true
recovering := sa.recovering
js.mu.Unlock()

mset, err := acc.lookupStream(cfg.Name)
Expand All @@ -3002,7 +3006,7 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
js.mu.Unlock()
}
// Call update.
if err = mset.update(cfg); err != nil {
if err = mset.updateWithAdvisory(cfg, !recovering); err != nil {
s.Warnf("JetStream cluster error updating stream %q for account %q: %v", cfg.Name, acc.Name, err)
}
// Set the new stream assignment.
Expand Down Expand Up @@ -3036,9 +3040,7 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
return
}

mset.mu.RLock()
isLeader := mset.isLeader()
mset.mu.RUnlock()
isLeader := mset.IsLeader()

// Check for missing syncSubject bug.
if isLeader && osa != nil && osa.Sync == _EMPTY_ {
Expand All @@ -3054,7 +3056,7 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
}

// Check if we should bail.
if !isLeader || hasResponded {
if !isLeader || hasResponded || recovering {
return
}

Expand Down Expand Up @@ -3114,19 +3116,21 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme
if sa.Group.Name == osa.Group.Name && reflect.DeepEqual(sa.Group.Peers, osa.Group.Peers) {
// Since this already exists we know it succeeded, just respond to this caller.
js.mu.RLock()
client, subject, reply := sa.Client, sa.Subject, sa.Reply
client, subject, reply, recovering := sa.Client, sa.Subject, sa.Reply, sa.recovering
js.mu.RUnlock()

var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Cluster: js.clusterInfo(mset.raftGroup()),
Sources: mset.sourcesInfo(),
Mirror: mset.mirrorInfo(),
if !recovering {
var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Cluster: js.clusterInfo(mset.raftGroup()),
Sources: mset.sourcesInfo(),
Mirror: mset.mirrorInfo(),
}
s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
}
s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
return
} else {
// We had a bug where we could have multiple assignments for the same
Expand Down Expand Up @@ -3342,6 +3346,7 @@ func (js *jetStream) processClusterDeleteStream(sa *streamAssignment, isMember,
if cc := js.cluster; cc != nil {
isMetaLeader = cc.isLeader()
}
recovering := sa.recovering
js.mu.RUnlock()

stopped := false
Expand Down Expand Up @@ -3400,7 +3405,7 @@ func (js *jetStream) processClusterDeleteStream(sa *streamAssignment, isMember,
}

// Do not respond if the account does not exist any longer
if acc == nil {
if acc == nil || recovering {
return
}

Expand Down Expand Up @@ -3653,7 +3658,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
// This consumer exists.
// Only update if config is really different.
cfg := o.config()
if !reflect.DeepEqual(&cfg, ca.Config) {
if isConfigUpdate = !reflect.DeepEqual(&cfg, ca.Config); isConfigUpdate {
// Call into update, ignore consumer exists error here since this means an old deliver subject is bound
// which can happen on restart etc.
if err := o.updateConfig(ca.Config); err != nil && err != NewJSConsumerNameExistError() {
Expand Down Expand Up @@ -3693,7 +3698,9 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
// If we look like we are scaling up, let's send our current state to the group.
sendState = len(ca.Group.Peers) > len(oca.Group.Peers) && o.IsLeader() && n != nil
// Signal that this is an update
isConfigUpdate = true
if ca.Reply != _EMPTY_ {
isConfigUpdate = true
}
}
js.mu.RUnlock()

Expand Down Expand Up @@ -3794,13 +3801,15 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
// For existing consumer, only send response if not recovering.
if wasExisting && !js.isMetaRecovering() {
if o.IsLeader() || (!didCreate && needsLocalResponse) {
// Process if existing as an update.
// Process if existing as an update. Double check that this is not recovered.
js.mu.RLock()
client, subject, reply := ca.Client, ca.Subject, ca.Reply
client, subject, reply, recovering := ca.Client, ca.Subject, ca.Reply, ca.recovering
js.mu.RUnlock()
var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}
resp.ConsumerInfo = o.info()
s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
if !recovering {
var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}
resp.ConsumerInfo = o.info()
s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
}
}
}
}
Expand All @@ -3819,6 +3828,7 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, isMemb
if cc := js.cluster; cc != nil {
isMetaLeader = cc.isLeader()
}
recovering := ca.recovering
js.mu.RUnlock()

stopped := false
Expand Down Expand Up @@ -3856,8 +3866,8 @@ func (js *jetStream) processClusterDeleteConsumer(ca *consumerAssignment, isMemb
}
}

// Do not respond if the account does not exist any longer
if acc == nil {
// Do not respond if the account does not exist any longer or this is during recovery.
if acc == nil || recovering {
return
}

Expand Down

0 comments on commit d347cb1

Please sign in to comment.