Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Account resolver lock inversion #4588

Merged
merged 1 commit into from Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions locksordering.txt
Expand Up @@ -14,3 +14,5 @@ allow that lock to be held and the acquire a client lock which is not
possible with the normal account lock.

accountLeafList -> client

AccountResolver interface has various implementations, but assume: AccountResolver -> Server
21 changes: 0 additions & 21 deletions server/jetstream_cluster_2_test.go
Expand Up @@ -2734,25 +2734,6 @@ func TestJetStreamClusterFlowControlRequiresHeartbeats(t *testing.T) {
}
}

var jsClusterAccountLimitsTempl = `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}

cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}

no_auth_user: js

accounts {
$JS { users = [ { user: "js", pass: "p" } ]; jetstream: {max_store: 1MB, max_mem: 0} }
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}
`

func TestJetStreamClusterMixedModeColdStartPrune(t *testing.T) {
// Purposely make this unbalanced. Without changes this will never form a quorum to elect the meta-leader.
c := createMixedModeCluster(t, jsMixedModeGlobalAccountTempl, "MMCS5", _EMPTY_, 3, 4, false)
Expand Down Expand Up @@ -5552,10 +5533,8 @@ func TestJetStreamClusterConsumerOverrides(t *testing.T) {
o := mset.lookupConsumer("m")
require_True(t, o != nil)

o.mu.RLock()
st := o.store.Type()
n := o.raftNode()
o.mu.RUnlock()
require_True(t, n != nil)
rn := n.(*raft)
rn.RLock()
Expand Down
48 changes: 27 additions & 21 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -1372,7 +1372,7 @@ func TestJetStreamClusterPullConsumerAcksExtendInactivityThreshold(t *testing.T)
}

// https://github.com/nats-io/nats-server/issues/3677
func TestJetStreamParallelStreamCreation(t *testing.T) {
func TestJetStreamClusterParallelStreamCreation(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

Expand Down Expand Up @@ -1414,7 +1414,7 @@ func TestJetStreamParallelStreamCreation(t *testing.T) {

// In addition to test above, if streams were attempted to be created in parallel
// it could be that multiple raft groups would be created for the same asset.
func TestJetStreamParallelStreamCreationDupeRaftGroups(t *testing.T) {
func TestJetStreamClusterParallelStreamCreationDupeRaftGroups(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

Expand Down Expand Up @@ -1463,19 +1463,19 @@ func TestJetStreamParallelStreamCreationDupeRaftGroups(t *testing.T) {
expected := 2
rg := make(map[string]struct{})
for _, s := range c.servers {
s.mu.RLock()
s.rnMu.RLock()
for _, ni := range s.raftNodes {
n := ni.(*raft)
rg[n.Group()] = struct{}{}
}
s.mu.RUnlock()
s.rnMu.RUnlock()
}
if len(rg) != expected {
t.Fatalf("Expected only %d distinct raft groups for all servers, go %d", expected, len(rg))
}
}

func TestJetStreamParallelConsumerCreation(t *testing.T) {
func TestJetStreamClusterParallelConsumerCreation(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

Expand Down Expand Up @@ -1538,19 +1538,19 @@ func TestJetStreamParallelConsumerCreation(t *testing.T) {
expected := 3
rg := make(map[string]struct{})
for _, s := range c.servers {
s.mu.RLock()
s.rnMu.RLock()
for _, ni := range s.raftNodes {
n := ni.(*raft)
rg[n.Group()] = struct{}{}
}
s.mu.RUnlock()
s.rnMu.RUnlock()
}
if len(rg) != expected {
t.Fatalf("Expected only %d distinct raft groups for all servers, go %d", expected, len(rg))
}
}

func TestJetStreamGhostEphemeralsAfterRestart(t *testing.T) {
func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

Expand Down Expand Up @@ -3470,19 +3470,20 @@ func TestJetStreamClusterNoR1AssetsDuringLameDuck(t *testing.T) {
}()
defer close(qch)

s.mu.RLock()
gacc := s.gacc
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a function for this IIRC.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The reason why I have explicit lock here is that in my first attempt I was also getting "s.js", which after some rework I realized that we don't really need it for this test. I will replace when opportunity presents itself.

s.mu.RUnlock()
if gacc == nil {
t.Fatalf("No global account")
}
// Make sure we do not have any R1 assets placed on the lameduck server.
for s.isRunning() {
s.mu.RLock()
if s.js == nil || s.js.srv == nil || s.js.srv.gacc == nil {
s.mu.RUnlock()
break
}
hasAsset := len(s.js.srv.gacc.streams()) > 0
s.mu.RUnlock()
if hasAsset {
if len(gacc.streams()) > 0 {
t.Fatalf("Server had an R1 asset when it should not due to lameduck mode")
}
time.Sleep(15 * time.Millisecond)
}
s.WaitForShutdown()
}

// If a consumer has not been registered (possible in heavily loaded systems with lots of assets)
Expand Down Expand Up @@ -3999,9 +4000,14 @@ func TestJetStreamClusterStreamScaleUpNoGroupCluster(t *testing.T) {
sa.Group.Cluster = _EMPTY_
sa.Group.Preferred = _EMPTY_
// Insert into meta layer.
s.mu.RLock()
s.js.cluster.meta.ForwardProposal(encodeUpdateStreamAssignment(sa))
s.mu.RUnlock()
if sjs := s.getJetStream(); sjs != nil {
sjs.mu.RLock()
meta := sjs.cluster.meta
sjs.mu.RUnlock()
if meta != nil {
meta.ForwardProposal(encodeUpdateStreamAssignment(sa))
}
}
// Make sure it got propagated..
checkFor(t, 10*time.Second, 200*time.Millisecond, func() error {
sa := mset.streamAssignment().copyGroup()
Expand Down Expand Up @@ -4714,7 +4720,7 @@ func TestJetStreamClusterSnapshotAndRestoreWithHealthz(t *testing.T) {
require_True(t, si.State.Msgs == uint64(toSend))
}

func TestJetStreamBinaryStreamSnapshotCapability(t *testing.T) {
func TestJetStreamClusterBinaryStreamSnapshotCapability(t *testing.T) {
c := createJetStreamClusterExplicit(t, "NATS", 3)
defer c.shutdown()

Expand Down Expand Up @@ -4783,7 +4789,7 @@ func TestJetStreamClusterBadEncryptKey(t *testing.T) {
}
}

func TestJetStreamAccountUsageDrifts(t *testing.T) {
func TestJetStreamClusterAccountUsageDrifts(t *testing.T) {
tmpl := `
listen: 127.0.0.1:-1
server_name: %s
Expand Down
19 changes: 19 additions & 0 deletions server/jetstream_helpers_test.go
Expand Up @@ -281,6 +281,25 @@ var jsMixedModeGlobalAccountTempl = `

var jsGWTempl = `%s{name: %s, urls: [%s]}`

var jsClusterAccountLimitsTempl = `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}

cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}

no_auth_user: js

accounts {
$JS { users = [ { user: "js", pass: "p" } ]; jetstream: {max_store: 1MB, max_mem: 0} }
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}
`

func createJetStreamTaggedSuperCluster(t *testing.T) *supercluster {
return createJetStreamTaggedSuperClusterWithGWProxy(t, nil)
}
Expand Down
10 changes: 8 additions & 2 deletions server/server.go
Expand Up @@ -1272,6 +1272,7 @@ func (s *Server) configureAccounts(reloading bool) (map[string]struct{}, error)

// Setup the account resolver. For memory resolver, make sure the JWTs are
// properly formed but do not enforce expiration etc.
// Lock is held on entry, but may be released/reacquired during this call.
func (s *Server) configureResolver() error {
opts := s.getOpts()
s.accResolver = opts.AccountResolver
Expand All @@ -1286,15 +1287,20 @@ func (s *Server) configureResolver() error {
}
}
if len(opts.resolverPreloads) > 0 {
if s.accResolver.IsReadOnly() {
// Lock ordering is account resolver -> server, so we need to release
// the lock and reacquire it when done with account resolver's calls.
ar := s.accResolver
s.mu.Unlock()
defer s.mu.Lock()
if ar.IsReadOnly() {
return fmt.Errorf("resolver preloads only available for writeable resolver types MEM/DIR/CACHE_DIR")
}
for k, v := range opts.resolverPreloads {
_, err := jwt.DecodeAccountClaims(v)
if err != nil {
return fmt.Errorf("preload account error for %q: %v", k, err)
}
s.accResolver.Store(k, v)
ar.Store(k, v)
}
}
}
Expand Down