Skip to content

Commit

Permalink
[FIXED] Fix for datarace on a stream's clfs. (#4508)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Sep 10, 2023
2 parents 5def0a9 + 7d041da commit 01aab6d
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 7 deletions.
9 changes: 3 additions & 6 deletions server/jetstream_cluster.go
Expand Up @@ -2892,9 +2892,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
}
} else if isRecovering {
// On recovery, reset CLFS/FAILED.
mset.mu.Lock()
mset.clfs = ss.Failed
mset.mu.Unlock()
mset.setCLFS(ss.Failed)
}
} else if e.Type == EntryRemovePeer {
js.mu.RLock()
Expand Down Expand Up @@ -7268,7 +7266,7 @@ func (mset *stream) stateSnapshot() []byte {
func (mset *stream) stateSnapshotLocked() []byte {
// Decide if we can support the new style of stream snapshots.
if mset.supportsBinarySnapshotLocked() {
snap, _ := mset.store.EncodedStreamState(mset.clfs)
snap, _ := mset.store.EncodedStreamState(mset.getCLFS())
return snap
}

Expand Down Expand Up @@ -7470,10 +7468,9 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
mset.clMu.Lock()
if mset.clseq == 0 || mset.clseq < lseq {
// Re-capture
lseq, clfs = mset.lastSeqAndCLFS()
lseq, clfs = mset.lseq, mset.clfs
mset.clseq = lseq + clfs
}

esm := encodeStreamMsgAllowCompress(subject, reply, hdr, msg, mset.clseq, time.Now().UnixNano(), mset.compressOK)
mset.clseq++

Expand Down
14 changes: 13 additions & 1 deletion server/stream.go
Expand Up @@ -979,7 +979,7 @@ func (mset *stream) rebuildDedupe() {
func (mset *stream) lastSeqAndCLFS() (uint64, uint64) {
mset.mu.RLock()
defer mset.mu.RUnlock()
return mset.lseq, mset.clfs
return mset.lseq, mset.getCLFS()
}

func (mset *stream) clearCLFS() uint64 {
Expand All @@ -990,6 +990,18 @@ func (mset *stream) clearCLFS() uint64 {
return clfs
}

func (mset *stream) getCLFS() uint64 {
mset.clMu.Lock()
defer mset.clMu.Unlock()
return mset.clfs
}

func (mset *stream) setCLFS(clfs uint64) {
mset.clMu.Lock()
mset.clfs = clfs
mset.clMu.Unlock()
}

func (mset *stream) lastSeq() uint64 {
mset.mu.RLock()
lseq := mset.lseq
Expand Down

0 comments on commit 01aab6d

Please sign in to comment.