Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Do not hold jetstream lock cleaning up orphans, could deadlock. #3945

Merged
merged 2 commits into from Mar 8, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
40 changes: 33 additions & 7 deletions server/jetstream_cluster.go
Expand Up @@ -960,36 +960,62 @@ type recoveryUpdates struct {
// Streams and consumers are recovered from disk, and the meta layer's mappings
// should clean them up, but under crash scenarios there could be orphans.
func (js *jetStream) checkForOrphans() {
js.mu.Lock()
defer js.mu.Unlock()

consumerName := func(o *consumer) string {
o.mu.RLock()
defer o.mu.RUnlock()
return o.name
}

// Can not hold jetstream lock while trying to delete streams or consumers.
js.mu.Lock()
s, cc := js.srv, js.cluster
s.Debugf("JetStream cluster checking for orphans")

var streams []*stream
var consumers []*consumer

for accName, jsa := range js.accounts {
asa := cc.streams[accName]
for stream, mset := range jsa.streams {
if sa := asa[stream]; sa == nil {
s.Warnf("Detected orphaned stream '%s > %s', will cleanup", accName, stream)
mset.delete()
streams = append(streams, mset)
} else {
// This one is good, check consumers now.
for _, o := range mset.getConsumers() {
consumer := consumerName(o)
if sa.consumers[consumer] == nil {
s.Warnf("Detected orphaned consumer '%s > %s > %s', will cleanup", accName, stream, consumer)
o.delete()
consumers = append(consumers, o)
}
}
}
}
}
js.mu.Unlock()

for _, mset := range streams {
mset.mu.RLock()
accName, stream := mset.acc.Name, mset.cfg.Name
mset.mu.RUnlock()
s.Warnf("Detected orphaned stream '%s > %s', will cleanup", accName, stream)
if err := mset.delete(); err != nil {
s.Warnf("Deleting stream encountered an error: %v", err)
}
}
for _, o := range consumers {
o.mu.RLock()
accName, mset, consumer := o.acc.Name, o.mset, o.name
o.mu.RUnlock()
stream := "N/A"
if mset != nil {
mset.mu.RLock()
stream = mset.cfg.Name
mset.mu.RUnlock()
}
s.Warnf("Detected orphaned consumer '%s > %s > %s', will cleanup", accName, stream, consumer)
if err := o.delete(); err != nil {
s.Warnf("Deleting consumer encountered an error: %v", err)
}
}
}

func (js *jetStream) monitorCluster() {
Expand Down