Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Reduce contention for high connections in a JetStream enabled account with high API usage. #4613

Merged
merged 1 commit into from Oct 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/consumer.go
Expand Up @@ -1556,7 +1556,7 @@ func (o *consumer) deleteNotActive() {
}
}

s, js := o.mset.srv, o.mset.srv.js
s, js := o.mset.srv, o.srv.js.Load()
acc, stream, name, isDirect := o.acc.Name, o.stream, o.name, o.cfg.Direct
o.mu.Unlock()

Expand Down
2 changes: 1 addition & 1 deletion server/events.go
Expand Up @@ -875,7 +875,7 @@ func (s *Server) sendStatsz(subj string) {
m.Stats.ActiveServers = len(s.sys.servers) + 1

// JetStream
if js := s.js; js != nil {
if js := s.js.Load(); js != nil {
jStat := &JetStreamVarz{}
s.mu.RUnlock()
js.mu.RLock()
Expand Down
29 changes: 15 additions & 14 deletions server/filestore.go
Expand Up @@ -4729,29 +4729,30 @@ func (fs *fileStore) syncBlocks() {
mb.mu.Unlock()
continue
}
// See if we can close FDs due to being idle.
if mb.mfd != nil && mb.sinceLastWriteActivity() > closeFDsIdle {
mb.dirtyCloseWithRemove(false)
}
// Check if we need to sync. We will not hold lock during actual sync.
var fn string
if mb.needSync {
// Flush anything that may be pending.
if mb.pendingWriteSizeLocked() > 0 {
mb.flushPendingMsgsLocked()
}
if mb.mfd != nil {
mb.mfd.Sync()
} else {
fd, err := os.OpenFile(mb.mfn, os.O_RDWR, defaultFilePerms)
if err != nil {
mb.mu.Unlock()
continue
}
fn = mb.mfn
mb.needSync = false
}
mb.mu.Unlock()

// Check if we need to sync.
// This is done not holding any locks.
if fn != _EMPTY_ {
if fd, _ := os.OpenFile(fn, os.O_RDWR, defaultFilePerms); fd != nil {
fd.Sync()
fd.Close()
}
mb.needSync = false
}
// See if we can close FDs due to being idle.
if mb.mfd != nil && mb.sinceLastWriteActivity() > closeFDsIdle {
mb.dirtyCloseWithRemove(false)
}
mb.mu.Unlock()
}

fs.mu.Lock()
Expand Down
2 changes: 1 addition & 1 deletion server/gateway.go
Expand Up @@ -1128,8 +1128,8 @@ func (c *client) processGatewayInfo(info *Info) {
// connect events to switch those accounts into interest only mode.
s.mu.Lock()
s.ensureGWsInterestOnlyForLeafNodes()
js := s.js
s.mu.Unlock()
js := s.js.Load()

// If running in some tests, maintain the original behavior.
if gwDoNotForceInterestOnlyMode && js != nil {
Expand Down
76 changes: 25 additions & 51 deletions server/jetstream.go
Expand Up @@ -117,9 +117,11 @@ type jetStream struct {
// Some bools regarding general state.
metaRecovering bool
standAlone bool
disabled bool
oos bool
shuttingDown bool

// Atomic versions
disabled atomic.Bool
}

type remoteUsage struct {
Expand Down Expand Up @@ -372,9 +374,7 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error {
}
s.gcbMu.Unlock()

s.mu.Lock()
s.js = js
s.mu.Unlock()
s.js.Store(js)

// FIXME(dlc) - Allow memory only operation?
if stat, err := os.Stat(cfg.StoreDir); os.IsNotExist(err) {
Expand Down Expand Up @@ -530,10 +530,7 @@ func (s *Server) setupJetStreamExports() {
}

func (s *Server) jetStreamOOSPending() (wasPending bool) {
s.mu.Lock()
js := s.js
s.mu.Unlock()
if js != nil {
if js := s.getJetStream(); js != nil {
js.mu.Lock()
wasPending = js.oos
js.oos = true
Expand All @@ -543,13 +540,8 @@ func (s *Server) jetStreamOOSPending() (wasPending bool) {
}

func (s *Server) setJetStreamDisabled() {
s.mu.Lock()
js := s.js
s.mu.Unlock()
if js != nil {
js.mu.Lock()
js.disabled = true
js.mu.Unlock()
if js := s.getJetStream(); js != nil {
js.disabled.Store(true)
}
}

Expand Down Expand Up @@ -738,16 +730,15 @@ func (s *Server) configAllJetStreamAccounts() error {
// a non-default system account.
s.checkJetStreamExports()

// Snapshot into our own list. Might not be needed.
s.mu.Lock()
// Bail if server not enabled. If it was enabled and a reload turns it off
// that will be handled elsewhere.
js := s.js
js := s.getJetStream()
if js == nil {
s.mu.Unlock()
return nil
}

// Snapshot into our own list. Might not be needed.
s.mu.RLock()
if s.sys != nil {
// clustered stream removal will perform this cleanup as well
// this is mainly for initial cleanup
Expand All @@ -764,12 +755,12 @@ func (s *Server) configAllJetStreamAccounts() error {
}

var jsAccounts []*Account
s.accounts.Range(func(k, v interface{}) bool {
s.accounts.Range(func(k, v any) bool {
jsAccounts = append(jsAccounts, v.(*Account))
return true
})
accounts := &s.accounts
s.mu.Unlock()
s.mu.RUnlock()

// Process any jetstream enabled accounts here. These will be accounts we are
// already aware of at startup etc.
Expand Down Expand Up @@ -809,9 +800,7 @@ func (js *jetStream) isEnabled() bool {
if js == nil {
return false
}
js.mu.RLock()
defer js.mu.RUnlock()
return !js.disabled
return !js.disabled.Load()
}

// Mark that we will be in standlone mode.
Expand All @@ -821,9 +810,9 @@ func (js *jetStream) setJetStreamStandAlone(isStandAlone bool) {
}
js.mu.Lock()
defer js.mu.Unlock()
js.standAlone = isStandAlone

if isStandAlone {
if js.standAlone = isStandAlone; js.standAlone {
// Update our server atomic.
js.srv.isMetaLeader.Store(true)
js.accountPurge, _ = js.srv.systemSubscribe(JSApiAccountPurge, _EMPTY_, false, nil, js.srv.jsLeaderAccountPurgeRequest)
} else if js.accountPurge != nil {
js.srv.sysUnsubscribe(js.accountPurge)
Expand All @@ -832,11 +821,7 @@ func (js *jetStream) setJetStreamStandAlone(isStandAlone bool) {

// JetStreamEnabled reports if jetstream is enabled for this server.
func (s *Server) JetStreamEnabled() bool {
var js *jetStream
s.mu.RLock()
js = s.js
s.mu.RUnlock()
return js.isEnabled()
return s.getJetStream().isEnabled()
}

// JetStreamEnabledForDomain will report if any servers have JetStream enabled within this domain.
Expand Down Expand Up @@ -909,10 +894,7 @@ func (js *jetStream) isShuttingDown() bool {

// Shutdown jetstream for this server.
func (s *Server) shutdownJetStream() {
s.mu.RLock()
js := s.js
s.mu.RUnlock()

js := s.getJetStream()
if js == nil {
return
}
Expand Down Expand Up @@ -951,9 +933,7 @@ func (s *Server) shutdownJetStream() {
a.removeJetStream()
}

s.mu.Lock()
s.js = nil
s.mu.Unlock()
s.js.Store(nil)

js.mu.Lock()
js.accounts = nil
Expand Down Expand Up @@ -994,23 +974,20 @@ func (s *Server) shutdownJetStream() {
// created a dynamic configuration. A copy is returned.
func (s *Server) JetStreamConfig() *JetStreamConfig {
var c *JetStreamConfig
s.mu.Lock()
if s.js != nil {
copy := s.js.config
if js := s.getJetStream(); js != nil {
copy := js.config
c = &(copy)
}
s.mu.Unlock()
return c
}

// StoreDir returns the current JetStream directory.
func (s *Server) StoreDir() string {
s.mu.Lock()
defer s.mu.Unlock()
if s.js == nil {
js := s.getJetStream()
if js == nil {
return _EMPTY_
}
return s.js.config.StoreDir
return js.config.StoreDir
}

// JetStreamNumAccounts returns the number of enabled accounts this server is tracking.
Expand All @@ -1036,10 +1013,7 @@ func (s *Server) JetStreamReservedResources() (int64, int64, error) {
}

func (s *Server) getJetStream() *jetStream {
s.mu.RLock()
js := s.js
s.mu.RUnlock()
return js
return s.js.Load()
}

func (a *Account) assignJetStreamLimits(limits map[string]JetStreamAccountLimits) {
Expand Down
26 changes: 14 additions & 12 deletions server/jetstream_api.go
Expand Up @@ -2314,14 +2314,15 @@ func (s *Server) peerSetToNames(ps []string) []string {
// looks up the peer id for a given server name. Cluster and domain name are optional filter criteria
func (s *Server) nameToPeer(js *jetStream, serverName, clusterName, domainName string) string {
js.mu.RLock()
cc := js.cluster
defer js.mu.RUnlock()
for _, p := range cc.meta.Peers() {
si, ok := s.nodeToInfo.Load(p.ID)
if ok && si.(nodeInfo).name == serverName {
if clusterName == _EMPTY_ || clusterName == si.(nodeInfo).cluster {
if domainName == _EMPTY_ || domainName == si.(nodeInfo).domain {
return p.ID
if cc := js.cluster; cc != nil {
for _, p := range cc.meta.Peers() {
si, ok := s.nodeToInfo.Load(p.ID)
if ok && si.(nodeInfo).name == serverName {
if clusterName == _EMPTY_ || clusterName == si.(nodeInfo).cluster {
if domainName == _EMPTY_ || domainName == si.(nodeInfo).domain {
return p.ID
}
}
}
}
Expand Down Expand Up @@ -4217,11 +4218,11 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
}
// We have a consumer assignment.
js.mu.RLock()

var node RaftNode
var leaderNotPartOfGroup bool
var isMember bool

var (
node RaftNode
leaderNotPartOfGroup bool
isMember bool
)
rg := ca.Group
if rg != nil && rg.isMember(ourID) {
isMember = true
Expand All @@ -4233,6 +4234,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
}
}
js.mu.RUnlock()

// Check if we should ignore all together.
if node == nil {
// We have been assigned but have not created a node yet. If we are a member return
Expand Down