Skip to content

Commit

Permalink
[FIXED] Fixed deadlock when checkAndSync was being called as part of …
Browse files Browse the repository at this point in the history
…storing message (#4411)

We violated the locking pattern, so we now make sure we do this in a
separate Go routine and put checks to only run it once.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Aug 21, 2023
2 parents 3377f04 + 0a86bf4 commit 6e3ae20
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Expand Up @@ -37,7 +37,7 @@ jobs:
- name: "Run all tests from all other packages"
env: TEST_SUITE=non_srv_pkg_tests
- name: "Compile with older Go release"
go: 1.18.x
go: 1.19.12
env: TEST_SUITE=build_only

script: ./scripts/runTestsOnTravis.sh $TEST_SUITE
Expand Down
14 changes: 13 additions & 1 deletion server/jetstream.go
Expand Up @@ -148,6 +148,9 @@ type jsAccount struct {
// From server
sendq *ipQueue[*pubMsg]

// For limiting only running one checkAndSync at a time.
sync atomic.Bool

// Usage/limits related fields that will be protected by usageMu
usageMu sync.RWMutex
limits map[string]JetStreamAccountLimits // indexed by tierName
Expand Down Expand Up @@ -1811,6 +1814,12 @@ func (jsa *jsAccount) remoteUpdateUsage(sub *subscription, c *client, _ *Account
// When we detect a skew of some sort this will verify the usage reporting is correct.
// No locks should be held.
func (jsa *jsAccount) checkAndSyncUsage(tierName string, storeType StorageType) {
// This will run in a separate go routine, so check that we are only running once.
if !jsa.sync.CompareAndSwap(false, true) {
return
}
defer jsa.sync.Store(false)

// Hold the account read lock and the usage lock while we calculate.
// We scope by tier and storage type, but if R3 File has 200 streams etc. could
// show a pause. I did test with > 100 non-active streams and was 80-200ns or so.
Expand Down Expand Up @@ -1916,7 +1925,10 @@ func (jsa *jsAccount) updateUsage(tierName string, storeType StorageType, delta
jsa.usageMu.Unlock()

if needsCheck {
jsa.checkAndSyncUsage(tierName, storeType)
// We could be holding the stream lock from up in the stack, and this
// will want the jsa lock, which would violate locking order.
// So do this in a Go routine. The function will check if it is already running.
go jsa.checkAndSyncUsage(tierName, storeType)
}
}

Expand Down
27 changes: 27 additions & 0 deletions server/jetstream_test.go
Expand Up @@ -20408,3 +20408,30 @@ func TestJetStreamLastSequenceBySubjectConcurrent(t *testing.T) {
})
}
}

func TestJetStreamUsageSyncDeadlock(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
})
require_NoError(t, err)

sendStreamMsg(t, nc, "foo", "hello")

// Now purposely mess up the usage that will force a sync.
// Without the fix this will deadlock.
jsa := s.getJetStream().lookupAccount(s.GlobalAccount())
jsa.usageMu.Lock()
st, ok := jsa.usage[_EMPTY_]
require_True(t, ok)
st.local.store = -1000
jsa.usageMu.Unlock()

sendStreamMsg(t, nc, "foo", "hello")
}

0 comments on commit 6e3ae20

Please sign in to comment.