Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Move some contended locks to atomic.Bools #4585

Merged
merged 3 commits into from Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 9 additions & 14 deletions server/accounts.go
Expand Up @@ -30,6 +30,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/nats-io/jwt/v2"
Expand Down Expand Up @@ -72,6 +73,7 @@ type Account struct {
lqws map[string]int32
usersRevoked map[string]int64
mappings []*mapping
hasMapped atomic.Bool
lmu sync.RWMutex
lleafs []*client
leafClusters map[string]uint64
Expand Down Expand Up @@ -291,6 +293,8 @@ func (a *Account) shallowCopy(na *Account) {
if len(na.mappings) > 0 && na.prand == nil {
na.prand = rand.New(rand.NewSource(time.Now().UnixNano()))
}
na.hasMapped.Store(len(na.mappings) > 0)

// JetStream
na.jsLimits = a.jsLimits
// Server config account limits.
Expand Down Expand Up @@ -703,6 +707,7 @@ func (a *Account) AddWeightedMappings(src string, dests ...*MapDest) error {
}
// If we did not replace add to the end.
a.mappings = append(a.mappings, m)
a.hasMapped.Store(len(a.mappings) > 0)

// If we have connected leafnodes make sure to update.
if a.nleafs > 0 {
Expand All @@ -729,6 +734,7 @@ func (a *Account) RemoveMapping(src string) bool {
a.mappings[i] = a.mappings[len(a.mappings)-1]
a.mappings[len(a.mappings)-1] = nil // gc
a.mappings = a.mappings[:len(a.mappings)-1]
a.hasMapped.Store(len(a.mappings) > 0)
return true
}
}
Expand All @@ -740,28 +746,17 @@ func (a *Account) hasMappings() bool {
if a == nil {
return false
}
a.mu.RLock()
hm := a.hasMappingsLocked()
a.mu.RUnlock()
return hm
}

// Indicates we have mapping entries.
// The account has been verified to be non-nil.
// Read or Write lock held on entry.
func (a *Account) hasMappingsLocked() bool {
return len(a.mappings) > 0
return a.hasMapped.Load()
}

// This performs the logic to map to a new dest subject based on mappings.
// Should only be called from processInboundClientMsg or service import processing.
func (a *Account) selectMappedSubject(dest string) (string, bool) {
a.mu.RLock()
if len(a.mappings) == 0 {
a.mu.RUnlock()
if !a.hasMappings() {
return dest, false
}

a.mu.RLock()
// In case we have to tokenize for subset matching.
tsa := [32]string{}
tts := tsa[:0]
Expand Down
2 changes: 1 addition & 1 deletion server/auth.go
Expand Up @@ -983,7 +983,7 @@ func (s *Server) processClientOrLeafAuthentication(c *client, opts *Options) (au
acc.mu.RLock()
c.Debugf("Authenticated JWT: %s %q (claim-name: %q, claim-tags: %q) "+
"signed with %q by Account %q (claim-name: %q, claim-tags: %q) signed with %q has mappings %t accused %p",
c.kindString(), juc.Subject, juc.Name, juc.Tags, juc.Issuer, issuer, acc.nameTag, acc.tags, acc.Issuer, acc.hasMappingsLocked(), acc)
c.kindString(), juc.Subject, juc.Name, juc.Tags, juc.Issuer, issuer, acc.nameTag, acc.tags, acc.Issuer, acc.hasMappings(), acc)
acc.mu.RUnlock()
return true
}
Expand Down
4 changes: 2 additions & 2 deletions server/client.go
Expand Up @@ -3735,7 +3735,7 @@ func (c *client) processInboundMsg(msg []byte) {
}
}

// selectMappedSubject will chose the mapped subject based on the client's inbound subject.
// selectMappedSubject will choose the mapped subject based on the client's inbound subject.
func (c *client) selectMappedSubject() bool {
nsubj, changed := c.acc.selectMappedSubject(string(c.pa.subject))
if changed {
Expand Down Expand Up @@ -5225,7 +5225,7 @@ func (c *client) reconnect() {

// It is possible that the server is being shutdown.
// If so, don't try to reconnect
if !srv.running {
if !srv.isRunning() {
return
}

Expand Down
8 changes: 4 additions & 4 deletions server/events.go
Expand Up @@ -715,7 +715,7 @@ func (s *Server) eventsRunning() bool {
return false
}
s.mu.RLock()
er := s.running && s.eventsEnabled()
er := s.isRunning() && s.eventsEnabled()
s.mu.RUnlock()
return er
}
Expand All @@ -739,7 +739,7 @@ func (s *Server) eventsEnabled() bool {
func (s *Server) TrackedRemoteServers() int {
s.mu.RLock()
defer s.mu.RUnlock()
if !s.running || !s.eventsEnabled() {
if !s.isRunning() || !s.eventsEnabled() {
return -1
}
return len(s.sys.servers)
Expand Down Expand Up @@ -1484,7 +1484,7 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su

// Should do normal updates before bailing if wrong domain.
s.mu.Lock()
if s.running && s.eventsEnabled() && ssm.Server.ID != s.info.ID {
if s.isRunning() && s.eventsEnabled() && ssm.Server.ID != s.info.ID {
s.updateRemoteServer(&si)
}
s.mu.Unlock()
Expand Down Expand Up @@ -1943,7 +1943,7 @@ func (s *Server) remoteConnsUpdate(sub *subscription, c *client, _ *Account, sub
s.mu.Lock()

// check again here if we have been shutdown.
if !s.running || !s.eventsEnabled() {
if !s.isRunning() || !s.eventsEnabled() {
s.mu.Unlock()
return
}
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster.go
Expand Up @@ -183,7 +183,7 @@ const (
func (s *Server) trackedJetStreamServers() (js, total int) {
s.mu.RLock()
defer s.mu.RUnlock()
if !s.running || !s.eventsEnabled() {
if !s.isRunning() || !s.eventsEnabled() {
return -1, -1
}
s.nodeToInfo.Range(func(k, v interface{}) bool {
Expand Down
2 changes: 1 addition & 1 deletion server/mqtt.go
Expand Up @@ -499,7 +499,7 @@ func (s *Server) createMQTTClient(conn net.Conn, ws *websocket) *client {
c.mu.Unlock()

s.mu.Lock()
if !s.running || s.ldm {
if !s.isRunning() || s.ldm {
if s.shutdown {
conn.Close()
}
Expand Down
6 changes: 4 additions & 2 deletions server/route.go
Expand Up @@ -1834,7 +1834,7 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string
id := info.ID

s.mu.Lock()
if !s.running || s.routesReject {
if !s.isRunning() || s.routesReject {
s.mu.Unlock()
return false
}
Expand Down Expand Up @@ -2654,7 +2654,9 @@ func (c *client) processRouteConnect(srv *Server, arg []byte, lang string) error
// We will take on their name since theirs is configured or higher then ours.
srv.setClusterName(proto.Cluster)
if !proto.Dynamic {
srv.getOpts().Cluster.Name = proto.Cluster
srv.optsMu.Lock()
srv.opts.Cluster.Name = proto.Cluster
srv.optsMu.Unlock()
}
c.mu.Lock()
remoteID := c.opts.Name
Expand Down
13 changes: 5 additions & 8 deletions server/server.go
Expand Up @@ -131,7 +131,7 @@ type Server struct {
configFile string
optsMu sync.RWMutex
opts *Options
running bool
running atomic.Bool
shutdown bool
listener net.Listener
listenerErr error
Expand Down Expand Up @@ -1482,10 +1482,7 @@ func (s *Server) Running() bool {

// Protected check on running state
func (s *Server) isRunning() bool {
s.mu.RLock()
running := s.running
s.mu.RUnlock()
return running
return s.running.Load()
}

func (s *Server) logPid() error {
Expand Down Expand Up @@ -2083,8 +2080,8 @@ func (s *Server) Start() {
s.checkAuthforWarnings()

// Avoid RACE between Start() and Shutdown()
s.running.Store(true)
s.mu.Lock()
s.running = true
// Update leafNodeEnabled in case options have changed post NewServer()
// and before Start() (we should not be able to allow that, but server has
// direct reference to user-provided options - at least before a Reload() is
Expand Down Expand Up @@ -2375,7 +2372,7 @@ func (s *Server) Shutdown() {
opts := s.getOpts()

s.shutdown = true
s.running = false
s.running.Store(false)
s.grMu.Lock()
s.grRunning = false
s.grMu.Unlock()
Expand Down Expand Up @@ -3041,7 +3038,7 @@ func (s *Server) createClientEx(conn net.Conn, inProcess bool) *client {
// list of connections to close. It won't contain this one, so we need
// to bail out now otherwise the readLoop started down there would not
// be interrupted. Skip also if in lame duck mode.
if !s.running || s.ldm {
if !s.isRunning() || s.ldm {
// There are some tests that create a server but don't start it,
// and use "async" clients and perform the parsing manually. Such
// clients would branch here (since server is not running). However,
Expand Down
4 changes: 1 addition & 3 deletions server/server_test.go
Expand Up @@ -1222,9 +1222,7 @@ func TestServerValidateGatewaysOptions(t *testing.T) {
func TestAcceptError(t *testing.T) {
o := DefaultOptions()
s := New(o)
s.mu.Lock()
s.running = true
s.mu.Unlock()
s.running.Store(true)
defer s.Shutdown()
orgDelay := time.Hour
delay := s.acceptError("Test", fmt.Errorf("any error"), orgDelay)
Expand Down
2 changes: 1 addition & 1 deletion server/websocket.go
Expand Up @@ -1220,7 +1220,7 @@ func (s *Server) createWSClient(conn net.Conn, ws *websocket) *client {
c.mu.Unlock()

s.mu.Lock()
if !s.running || s.ldm {
if !s.isRunning() || s.ldm {
if s.shutdown {
conn.Close()
}
Expand Down