Skip to content

Commit

Permalink
[FIXED] Account resolver lock inversion (#4588)
Browse files Browse the repository at this point in the history
There was a lock inversion but low risk since it happened during server
initialization. Still fixed it and added the ordering in
locksordering.txt file.

Also fixed multiple lock inversions that were caused by tests.

Signed-off-by: Ivan Kozlovic <ijkozlovic@gmail.com>
  • Loading branch information
derekcollison committed Sep 26, 2023
2 parents 3056af0 + a84ce61 commit ee4d6ee
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 44 deletions.
2 changes: 2 additions & 0 deletions locksordering.txt
Expand Up @@ -14,3 +14,5 @@ allow that lock to be held and the acquire a client lock which is not
possible with the normal account lock.

accountLeafList -> client

AccountResolver interface has various implementations, but assume: AccountResolver -> Server
21 changes: 0 additions & 21 deletions server/jetstream_cluster_2_test.go
Expand Up @@ -2734,25 +2734,6 @@ func TestJetStreamClusterFlowControlRequiresHeartbeats(t *testing.T) {
}
}

var jsClusterAccountLimitsTempl = `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
no_auth_user: js
accounts {
$JS { users = [ { user: "js", pass: "p" } ]; jetstream: {max_store: 1MB, max_mem: 0} }
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}
`

func TestJetStreamClusterMixedModeColdStartPrune(t *testing.T) {
// Purposely make this unbalanced. Without changes this will never form a quorum to elect the meta-leader.
c := createMixedModeCluster(t, jsMixedModeGlobalAccountTempl, "MMCS5", _EMPTY_, 3, 4, false)
Expand Down Expand Up @@ -5552,10 +5533,8 @@ func TestJetStreamClusterConsumerOverrides(t *testing.T) {
o := mset.lookupConsumer("m")
require_True(t, o != nil)

o.mu.RLock()
st := o.store.Type()
n := o.raftNode()
o.mu.RUnlock()
require_True(t, n != nil)
rn := n.(*raft)
rn.RLock()
Expand Down
48 changes: 27 additions & 21 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -1372,7 +1372,7 @@ func TestJetStreamClusterPullConsumerAcksExtendInactivityThreshold(t *testing.T)
}

// https://github.com/nats-io/nats-server/issues/3677
func TestJetStreamParallelStreamCreation(t *testing.T) {
func TestJetStreamClusterParallelStreamCreation(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

Expand Down Expand Up @@ -1414,7 +1414,7 @@ func TestJetStreamParallelStreamCreation(t *testing.T) {

// In addition to test above, if streams were attempted to be created in parallel
// it could be that multiple raft groups would be created for the same asset.
func TestJetStreamParallelStreamCreationDupeRaftGroups(t *testing.T) {
func TestJetStreamClusterParallelStreamCreationDupeRaftGroups(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

Expand Down Expand Up @@ -1463,19 +1463,19 @@ func TestJetStreamParallelStreamCreationDupeRaftGroups(t *testing.T) {
expected := 2
rg := make(map[string]struct{})
for _, s := range c.servers {
s.mu.RLock()
s.rnMu.RLock()
for _, ni := range s.raftNodes {
n := ni.(*raft)
rg[n.Group()] = struct{}{}
}
s.mu.RUnlock()
s.rnMu.RUnlock()
}
if len(rg) != expected {
t.Fatalf("Expected only %d distinct raft groups for all servers, go %d", expected, len(rg))
}
}

func TestJetStreamParallelConsumerCreation(t *testing.T) {
func TestJetStreamClusterParallelConsumerCreation(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

Expand Down Expand Up @@ -1538,19 +1538,19 @@ func TestJetStreamParallelConsumerCreation(t *testing.T) {
expected := 3
rg := make(map[string]struct{})
for _, s := range c.servers {
s.mu.RLock()
s.rnMu.RLock()
for _, ni := range s.raftNodes {
n := ni.(*raft)
rg[n.Group()] = struct{}{}
}
s.mu.RUnlock()
s.rnMu.RUnlock()
}
if len(rg) != expected {
t.Fatalf("Expected only %d distinct raft groups for all servers, go %d", expected, len(rg))
}
}

func TestJetStreamGhostEphemeralsAfterRestart(t *testing.T) {
func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

Expand Down Expand Up @@ -3470,19 +3470,20 @@ func TestJetStreamClusterNoR1AssetsDuringLameDuck(t *testing.T) {
}()
defer close(qch)

s.mu.RLock()
gacc := s.gacc
s.mu.RUnlock()
if gacc == nil {
t.Fatalf("No global account")
}
// Make sure we do not have any R1 assets placed on the lameduck server.
for s.isRunning() {
s.mu.RLock()
if s.js == nil || s.js.srv == nil || s.js.srv.gacc == nil {
s.mu.RUnlock()
break
}
hasAsset := len(s.js.srv.gacc.streams()) > 0
s.mu.RUnlock()
if hasAsset {
if len(gacc.streams()) > 0 {
t.Fatalf("Server had an R1 asset when it should not due to lameduck mode")
}
time.Sleep(15 * time.Millisecond)
}
s.WaitForShutdown()
}

// If a consumer has not been registered (possible in heavily loaded systems with lots of assets)
Expand Down Expand Up @@ -3999,9 +4000,14 @@ func TestJetStreamClusterStreamScaleUpNoGroupCluster(t *testing.T) {
sa.Group.Cluster = _EMPTY_
sa.Group.Preferred = _EMPTY_
// Insert into meta layer.
s.mu.RLock()
s.js.cluster.meta.ForwardProposal(encodeUpdateStreamAssignment(sa))
s.mu.RUnlock()
if sjs := s.getJetStream(); sjs != nil {
sjs.mu.RLock()
meta := sjs.cluster.meta
sjs.mu.RUnlock()
if meta != nil {
meta.ForwardProposal(encodeUpdateStreamAssignment(sa))
}
}
// Make sure it got propagated..
checkFor(t, 10*time.Second, 200*time.Millisecond, func() error {
sa := mset.streamAssignment().copyGroup()
Expand Down Expand Up @@ -4714,7 +4720,7 @@ func TestJetStreamClusterSnapshotAndRestoreWithHealthz(t *testing.T) {
require_True(t, si.State.Msgs == uint64(toSend))
}

func TestJetStreamBinaryStreamSnapshotCapability(t *testing.T) {
func TestJetStreamClusterBinaryStreamSnapshotCapability(t *testing.T) {
c := createJetStreamClusterExplicit(t, "NATS", 3)
defer c.shutdown()

Expand Down Expand Up @@ -4783,7 +4789,7 @@ func TestJetStreamClusterBadEncryptKey(t *testing.T) {
}
}

func TestJetStreamAccountUsageDrifts(t *testing.T) {
func TestJetStreamClusterAccountUsageDrifts(t *testing.T) {
tmpl := `
listen: 127.0.0.1:-1
server_name: %s
Expand Down
19 changes: 19 additions & 0 deletions server/jetstream_helpers_test.go
Expand Up @@ -281,6 +281,25 @@ var jsMixedModeGlobalAccountTempl = `

var jsGWTempl = `%s{name: %s, urls: [%s]}`

var jsClusterAccountLimitsTempl = `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
no_auth_user: js
accounts {
$JS { users = [ { user: "js", pass: "p" } ]; jetstream: {max_store: 1MB, max_mem: 0} }
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}
`

func createJetStreamTaggedSuperCluster(t *testing.T) *supercluster {
return createJetStreamTaggedSuperClusterWithGWProxy(t, nil)
}
Expand Down
10 changes: 8 additions & 2 deletions server/server.go
Expand Up @@ -1272,6 +1272,7 @@ func (s *Server) configureAccounts(reloading bool) (map[string]struct{}, error)

// Setup the account resolver. For memory resolver, make sure the JWTs are
// properly formed but do not enforce expiration etc.
// Lock is held on entry, but may be released/reacquired during this call.
func (s *Server) configureResolver() error {
opts := s.getOpts()
s.accResolver = opts.AccountResolver
Expand All @@ -1286,15 +1287,20 @@ func (s *Server) configureResolver() error {
}
}
if len(opts.resolverPreloads) > 0 {
if s.accResolver.IsReadOnly() {
// Lock ordering is account resolver -> server, so we need to release
// the lock and reacquire it when done with account resolver's calls.
ar := s.accResolver
s.mu.Unlock()
defer s.mu.Lock()
if ar.IsReadOnly() {
return fmt.Errorf("resolver preloads only available for writeable resolver types MEM/DIR/CACHE_DIR")
}
for k, v := range opts.resolverPreloads {
_, err := jwt.DecodeAccountClaims(v)
if err != nil {
return fmt.Errorf("preload account error for %q: %v", k, err)
}
s.accResolver.Store(k, v)
ar.Store(k, v)
}
}
}
Expand Down

0 comments on commit ee4d6ee

Please sign in to comment.