Skip to content

Commit

Permalink
[IMPROVED] Move some contended locks to atomic.Bools (#4585)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Sep 25, 2023
2 parents e594da5 + a002918 commit 2e12b87
Show file tree
Hide file tree
Showing 10 changed files with 29 additions and 37 deletions.
23 changes: 9 additions & 14 deletions server/accounts.go
Expand Up @@ -30,6 +30,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/nats-io/jwt/v2"
Expand Down Expand Up @@ -72,6 +73,7 @@ type Account struct {
lqws map[string]int32
usersRevoked map[string]int64
mappings []*mapping
hasMapped atomic.Bool
lmu sync.RWMutex
lleafs []*client
leafClusters map[string]uint64
Expand Down Expand Up @@ -291,6 +293,8 @@ func (a *Account) shallowCopy(na *Account) {
if len(na.mappings) > 0 && na.prand == nil {
na.prand = rand.New(rand.NewSource(time.Now().UnixNano()))
}
na.hasMapped.Store(len(na.mappings) > 0)

// JetStream
na.jsLimits = a.jsLimits
// Server config account limits.
Expand Down Expand Up @@ -703,6 +707,7 @@ func (a *Account) AddWeightedMappings(src string, dests ...*MapDest) error {
}
// If we did not replace add to the end.
a.mappings = append(a.mappings, m)
a.hasMapped.Store(len(a.mappings) > 0)

// If we have connected leafnodes make sure to update.
if a.nleafs > 0 {
Expand All @@ -729,6 +734,7 @@ func (a *Account) RemoveMapping(src string) bool {
a.mappings[i] = a.mappings[len(a.mappings)-1]
a.mappings[len(a.mappings)-1] = nil // gc
a.mappings = a.mappings[:len(a.mappings)-1]
a.hasMapped.Store(len(a.mappings) > 0)
return true
}
}
Expand All @@ -740,28 +746,17 @@ func (a *Account) hasMappings() bool {
if a == nil {
return false
}
a.mu.RLock()
hm := a.hasMappingsLocked()
a.mu.RUnlock()
return hm
}

// Indicates we have mapping entries.
// The account has been verified to be non-nil.
// Read or Write lock held on entry.
func (a *Account) hasMappingsLocked() bool {
return len(a.mappings) > 0
return a.hasMapped.Load()
}

// This performs the logic to map to a new dest subject based on mappings.
// Should only be called from processInboundClientMsg or service import processing.
func (a *Account) selectMappedSubject(dest string) (string, bool) {
a.mu.RLock()
if len(a.mappings) == 0 {
a.mu.RUnlock()
if !a.hasMappings() {
return dest, false
}

a.mu.RLock()
// In case we have to tokenize for subset matching.
tsa := [32]string{}
tts := tsa[:0]
Expand Down
2 changes: 1 addition & 1 deletion server/auth.go
Expand Up @@ -983,7 +983,7 @@ func (s *Server) processClientOrLeafAuthentication(c *client, opts *Options) (au
acc.mu.RLock()
c.Debugf("Authenticated JWT: %s %q (claim-name: %q, claim-tags: %q) "+
"signed with %q by Account %q (claim-name: %q, claim-tags: %q) signed with %q has mappings %t accused %p",
c.kindString(), juc.Subject, juc.Name, juc.Tags, juc.Issuer, issuer, acc.nameTag, acc.tags, acc.Issuer, acc.hasMappingsLocked(), acc)
c.kindString(), juc.Subject, juc.Name, juc.Tags, juc.Issuer, issuer, acc.nameTag, acc.tags, acc.Issuer, acc.hasMappings(), acc)
acc.mu.RUnlock()
return true
}
Expand Down
4 changes: 2 additions & 2 deletions server/client.go
Expand Up @@ -3735,7 +3735,7 @@ func (c *client) processInboundMsg(msg []byte) {
}
}

// selectMappedSubject will chose the mapped subject based on the client's inbound subject.
// selectMappedSubject will choose the mapped subject based on the client's inbound subject.
func (c *client) selectMappedSubject() bool {
nsubj, changed := c.acc.selectMappedSubject(string(c.pa.subject))
if changed {
Expand Down Expand Up @@ -5225,7 +5225,7 @@ func (c *client) reconnect() {

// It is possible that the server is being shutdown.
// If so, don't try to reconnect
if !srv.running {
if !srv.isRunning() {
return
}

Expand Down
8 changes: 4 additions & 4 deletions server/events.go
Expand Up @@ -715,7 +715,7 @@ func (s *Server) eventsRunning() bool {
return false
}
s.mu.RLock()
er := s.running && s.eventsEnabled()
er := s.isRunning() && s.eventsEnabled()
s.mu.RUnlock()
return er
}
Expand All @@ -739,7 +739,7 @@ func (s *Server) eventsEnabled() bool {
func (s *Server) TrackedRemoteServers() int {
s.mu.RLock()
defer s.mu.RUnlock()
if !s.running || !s.eventsEnabled() {
if !s.isRunning() || !s.eventsEnabled() {
return -1
}
return len(s.sys.servers)
Expand Down Expand Up @@ -1484,7 +1484,7 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su

// Should do normal updates before bailing if wrong domain.
s.mu.Lock()
if s.running && s.eventsEnabled() && ssm.Server.ID != s.info.ID {
if s.isRunning() && s.eventsEnabled() && ssm.Server.ID != s.info.ID {
s.updateRemoteServer(&si)
}
s.mu.Unlock()
Expand Down Expand Up @@ -1943,7 +1943,7 @@ func (s *Server) remoteConnsUpdate(sub *subscription, c *client, _ *Account, sub
s.mu.Lock()

// check again here if we have been shutdown.
if !s.running || !s.eventsEnabled() {
if !s.isRunning() || !s.eventsEnabled() {
s.mu.Unlock()
return
}
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster.go
Expand Up @@ -183,7 +183,7 @@ const (
func (s *Server) trackedJetStreamServers() (js, total int) {
s.mu.RLock()
defer s.mu.RUnlock()
if !s.running || !s.eventsEnabled() {
if !s.isRunning() || !s.eventsEnabled() {
return -1, -1
}
s.nodeToInfo.Range(func(k, v interface{}) bool {
Expand Down
2 changes: 1 addition & 1 deletion server/mqtt.go
Expand Up @@ -499,7 +499,7 @@ func (s *Server) createMQTTClient(conn net.Conn, ws *websocket) *client {
c.mu.Unlock()

s.mu.Lock()
if !s.running || s.ldm {
if !s.isRunning() || s.ldm {
if s.shutdown {
conn.Close()
}
Expand Down
6 changes: 4 additions & 2 deletions server/route.go
Expand Up @@ -1834,7 +1834,7 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string
id := info.ID

s.mu.Lock()
if !s.running || s.routesReject {
if !s.isRunning() || s.routesReject {
s.mu.Unlock()
return false
}
Expand Down Expand Up @@ -2654,7 +2654,9 @@ func (c *client) processRouteConnect(srv *Server, arg []byte, lang string) error
// We will take on their name since theirs is configured or higher then ours.
srv.setClusterName(proto.Cluster)
if !proto.Dynamic {
srv.getOpts().Cluster.Name = proto.Cluster
srv.optsMu.Lock()
srv.opts.Cluster.Name = proto.Cluster
srv.optsMu.Unlock()
}
c.mu.Lock()
remoteID := c.opts.Name
Expand Down
13 changes: 5 additions & 8 deletions server/server.go
Expand Up @@ -131,7 +131,7 @@ type Server struct {
configFile string
optsMu sync.RWMutex
opts *Options
running bool
running atomic.Bool
shutdown bool
listener net.Listener
listenerErr error
Expand Down Expand Up @@ -1482,10 +1482,7 @@ func (s *Server) Running() bool {

// Protected check on running state
func (s *Server) isRunning() bool {
s.mu.RLock()
running := s.running
s.mu.RUnlock()
return running
return s.running.Load()
}

func (s *Server) logPid() error {
Expand Down Expand Up @@ -2083,8 +2080,8 @@ func (s *Server) Start() {
s.checkAuthforWarnings()

// Avoid RACE between Start() and Shutdown()
s.running.Store(true)
s.mu.Lock()
s.running = true
// Update leafNodeEnabled in case options have changed post NewServer()
// and before Start() (we should not be able to allow that, but server has
// direct reference to user-provided options - at least before a Reload() is
Expand Down Expand Up @@ -2375,7 +2372,7 @@ func (s *Server) Shutdown() {
opts := s.getOpts()

s.shutdown = true
s.running = false
s.running.Store(false)
s.grMu.Lock()
s.grRunning = false
s.grMu.Unlock()
Expand Down Expand Up @@ -3041,7 +3038,7 @@ func (s *Server) createClientEx(conn net.Conn, inProcess bool) *client {
// list of connections to close. It won't contain this one, so we need
// to bail out now otherwise the readLoop started down there would not
// be interrupted. Skip also if in lame duck mode.
if !s.running || s.ldm {
if !s.isRunning() || s.ldm {
// There are some tests that create a server but don't start it,
// and use "async" clients and perform the parsing manually. Such
// clients would branch here (since server is not running). However,
Expand Down
4 changes: 1 addition & 3 deletions server/server_test.go
Expand Up @@ -1222,9 +1222,7 @@ func TestServerValidateGatewaysOptions(t *testing.T) {
func TestAcceptError(t *testing.T) {
o := DefaultOptions()
s := New(o)
s.mu.Lock()
s.running = true
s.mu.Unlock()
s.running.Store(true)
defer s.Shutdown()
orgDelay := time.Hour
delay := s.acceptError("Test", fmt.Errorf("any error"), orgDelay)
Expand Down
2 changes: 1 addition & 1 deletion server/websocket.go
Expand Up @@ -1220,7 +1220,7 @@ func (s *Server) createWSClient(conn net.Conn, ws *websocket) *client {
c.mu.Unlock()

s.mu.Lock()
if !s.running || s.ldm {
if !s.isRunning() || s.ldm {
if s.shutdown {
conn.Close()
}
Expand Down

0 comments on commit 2e12b87

Please sign in to comment.