Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Fixed deadlock when checkAndSync was being called as part of storing message #4411

Merged
merged 3 commits into from Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Expand Up @@ -37,7 +37,7 @@ jobs:
- name: "Run all tests from all other packages"
env: TEST_SUITE=non_srv_pkg_tests
- name: "Compile with older Go release"
go: 1.18.x
go: 1.19.12
env: TEST_SUITE=build_only

script: ./scripts/runTestsOnTravis.sh $TEST_SUITE
Expand Down
14 changes: 13 additions & 1 deletion server/jetstream.go
Expand Up @@ -148,6 +148,9 @@ type jsAccount struct {
// From server
sendq *ipQueue[*pubMsg]

// For limiting only running one checkAndSync at a time.
sync atomic.Bool

// Usage/limits related fields that will be protected by usageMu
usageMu sync.RWMutex
limits map[string]JetStreamAccountLimits // indexed by tierName
Expand Down Expand Up @@ -1811,6 +1814,12 @@ func (jsa *jsAccount) remoteUpdateUsage(sub *subscription, c *client, _ *Account
// When we detect a skew of some sort this will verify the usage reporting is correct.
// No locks should be held.
func (jsa *jsAccount) checkAndSyncUsage(tierName string, storeType StorageType) {
// This will run in a separate go routine, so check that we are only running once.
if !jsa.sync.CompareAndSwap(false, true) {
return
}
defer jsa.sync.Store(false)

// Hold the account read lock and the usage lock while we calculate.
// We scope by tier and storage type, but if R3 File has 200 streams etc. could
// show a pause. I did test with > 100 non-active streams and was 80-200ns or so.
Expand Down Expand Up @@ -1916,7 +1925,10 @@ func (jsa *jsAccount) updateUsage(tierName string, storeType StorageType, delta
jsa.usageMu.Unlock()

if needsCheck {
jsa.checkAndSyncUsage(tierName, storeType)
// We could be holding the stream lock from up in the stack, and this
// will want the jsa lock, which would violate locking order.
// So do this in a Go routine. The function will check if it is already running.
go jsa.checkAndSyncUsage(tierName, storeType)
}
}

Expand Down
27 changes: 27 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -20408,3 +20408,30 @@ func TestJetStreamLastSequenceBySubjectConcurrent(t *testing.T) {
})
}
}

func TestJetStreamUsageSyncDeadlock(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
})
require_NoError(t, err)

sendStreamMsg(t, nc, "foo", "hello")

// Now purposely mess up the usage that will force a sync.
// Without the fix this will deadlock.
jsa := s.getJetStream().lookupAccount(s.GlobalAccount())
jsa.usageMu.Lock()
st, ok := jsa.usage[_EMPTY_]
require_True(t, ok)
st.local.store = -1000
jsa.usageMu.Unlock()

sendStreamMsg(t, nc, "foo", "hello")
}