Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Fix for datarace on a stream's clfs. #4508

Merged
merged 1 commit into from Sep 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 3 additions & 6 deletions server/jetstream_cluster.go
Expand Up @@ -2892,9 +2892,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
}
} else if isRecovering {
// On recovery, reset CLFS/FAILED.
mset.mu.Lock()
mset.clfs = ss.Failed
mset.mu.Unlock()
mset.setCLFS(ss.Failed)
}
} else if e.Type == EntryRemovePeer {
js.mu.RLock()
Expand Down Expand Up @@ -7268,7 +7266,7 @@ func (mset *stream) stateSnapshot() []byte {
func (mset *stream) stateSnapshotLocked() []byte {
// Decide if we can support the new style of stream snapshots.
if mset.supportsBinarySnapshotLocked() {
snap, _ := mset.store.EncodedStreamState(mset.clfs)
snap, _ := mset.store.EncodedStreamState(mset.getCLFS())
return snap
}

Expand Down Expand Up @@ -7470,10 +7468,9 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
mset.clMu.Lock()
if mset.clseq == 0 || mset.clseq < lseq {
// Re-capture
lseq, clfs = mset.lastSeqAndCLFS()
lseq, clfs = mset.lseq, mset.clfs
mset.clseq = lseq + clfs
}

esm := encodeStreamMsgAllowCompress(subject, reply, hdr, msg, mset.clseq, time.Now().UnixNano(), mset.compressOK)
mset.clseq++

Expand Down
14 changes: 13 additions & 1 deletion server/stream.go
Expand Up @@ -979,7 +979,7 @@ func (mset *stream) rebuildDedupe() {
func (mset *stream) lastSeqAndCLFS() (uint64, uint64) {
mset.mu.RLock()
defer mset.mu.RUnlock()
return mset.lseq, mset.clfs
return mset.lseq, mset.getCLFS()
}

func (mset *stream) clearCLFS() uint64 {
Expand All @@ -990,6 +990,18 @@ func (mset *stream) clearCLFS() uint64 {
return clfs
}

func (mset *stream) getCLFS() uint64 {
mset.clMu.Lock()
defer mset.clMu.Unlock()
return mset.clfs
}

func (mset *stream) setCLFS(clfs uint64) {
mset.clMu.Lock()
mset.clfs = clfs
mset.clMu.Unlock()
}

func (mset *stream) lastSeq() uint64 {
mset.mu.RLock()
lseq := mset.lseq
Expand Down