Skip to content

Commit

Permalink
[IMPROVED] Optimizations for large single hub account leafnode fleets. (
Browse files Browse the repository at this point in the history
#4135)

Added a leafnode lock to allow better traversal without copying of large
leafnodes in a single hub account.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed May 6, 2023
2 parents 40ea58f + 80db7a2 commit 76f4358
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 62 deletions.
6 changes: 6 additions & 0 deletions locksordering.txt
Expand Up @@ -8,3 +8,9 @@ jetStream -> jsAccount -> stream -> consumer

A lock to protect jetstream account's usage has been introduced: jsAccount.usageMu.
This lock is independent and can be invoked under any other lock: jsAccount -> jsa.usageMu, stream -> jsa.usageMu, etc...

A lock to protect the account's leafnodes list was also introduced to
allow that lock to be held and the acquire a client lock which is not
possible with the normal account lock.

accountLeafList -> client
70 changes: 46 additions & 24 deletions server/accounts.go
Expand Up @@ -73,6 +73,7 @@ type Account struct {
lqws map[string]int32
usersRevoked map[string]int64
mappings []*mapping
lmu sync.RWMutex
lleafs []*client
leafClusters map[string]uint64
imports importMap
Expand Down Expand Up @@ -166,14 +167,17 @@ const (
Chunked
)

var commaSeparatorRegEx = regexp.MustCompile(`,\s*`)
var partitionMappingFunctionRegEx = regexp.MustCompile(`{{\s*[pP]artition\s*\((.*)\)\s*}}`)
var wildcardMappingFunctionRegEx = regexp.MustCompile(`{{\s*[wW]ildcard\s*\((.*)\)\s*}}`)
var splitFromLeftMappingFunctionRegEx = regexp.MustCompile(`{{\s*[sS]plit[fF]rom[lL]eft\s*\((.*)\)\s*}}`)
var splitFromRightMappingFunctionRegEx = regexp.MustCompile(`{{\s*[sS]plit[fF]rom[rR]ight\s*\((.*)\)\s*}}`)
var sliceFromLeftMappingFunctionRegEx = regexp.MustCompile(`{{\s*[sS]lice[fF]rom[lL]eft\s*\((.*)\)\s*}}`)
var sliceFromRightMappingFunctionRegEx = regexp.MustCompile(`{{\s*[sS]lice[fF]rom[rR]ight\s*\((.*)\)\s*}}`)
var splitMappingFunctionRegEx = regexp.MustCompile(`{{\s*[sS]plit\s*\((.*)\)\s*}}`)
// Subject mapping and transform setups.
var (
commaSeparatorRegEx = regexp.MustCompile(`,\s*`)
partitionMappingFunctionRegEx = regexp.MustCompile(`{{\s*[pP]artition\s*\((.*)\)\s*}}`)
wildcardMappingFunctionRegEx = regexp.MustCompile(`{{\s*[wW]ildcard\s*\((.*)\)\s*}}`)
splitFromLeftMappingFunctionRegEx = regexp.MustCompile(`{{\s*[sS]plit[fF]rom[lL]eft\s*\((.*)\)\s*}}`)
splitFromRightMappingFunctionRegEx = regexp.MustCompile(`{{\s*[sS]plit[fF]rom[rR]ight\s*\((.*)\)\s*}}`)
sliceFromLeftMappingFunctionRegEx = regexp.MustCompile(`{{\s*[sS]lice[fF]rom[lL]eft\s*\((.*)\)\s*}}`)
sliceFromRightMappingFunctionRegEx = regexp.MustCompile(`{{\s*[sS]lice[fF]rom[rR]ight\s*\((.*)\)\s*}}`)
splitMappingFunctionRegEx = regexp.MustCompile(`{{\s*[sS]plit\s*\((.*)\)\s*}}`)
)

// Enum for the subject mapping transform function types
const (
Expand Down Expand Up @@ -374,12 +378,14 @@ func (a *Account) updateRemoteServer(m *AccountNumConns) []*client {
mtlce := a.mleafs != jwt.NoLimit && (a.nleafs+a.nrleafs > a.mleafs)
if mtlce {
// Take ones from the end.
a.lmu.RLock()
leafs := a.lleafs
over := int(a.nleafs + a.nrleafs - a.mleafs)
if over < len(leafs) {
leafs = leafs[len(leafs)-over:]
}
clients = append(clients, leafs...)
a.lmu.RUnlock()
}
a.mu.Unlock()

Expand Down Expand Up @@ -719,13 +725,15 @@ func (a *Account) AddWeightedMappings(src string, dests ...*MapDest) error {
a.mappings = append(a.mappings, m)

// If we have connected leafnodes make sure to update.
if len(a.lleafs) > 0 {
leafs := append([]*client(nil), a.lleafs...)
if a.nleafs > 0 {
// Need to release because lock ordering is client -> account
a.mu.Unlock()
for _, lc := range leafs {
// Now grab the leaf list lock. We can hold client lock under this one.
a.lmu.RLock()
for _, lc := range a.lleafs {
lc.forceAddToSmap(src)
}
a.lmu.RUnlock()
a.mu.Lock()
}
return nil
Expand Down Expand Up @@ -911,11 +919,17 @@ func (a *Account) addClient(c *client) int {
a.sysclients++
} else if c.kind == LEAF {
a.nleafs++
a.lleafs = append(a.lleafs, c)
}
}
a.mu.Unlock()

// If we added a new leaf use the list lock and add it to the list.
if added && c.kind == LEAF {
a.lmu.Lock()
a.lleafs = append(a.lleafs, c)
a.lmu.Unlock()
}

if c != nil && c.srv != nil && added {
c.srv.accConnsUpdate(a)
}
Expand Down Expand Up @@ -949,8 +963,12 @@ func (a *Account) isLeafNodeClusterIsolated(cluster string) bool {
// Helper function to remove leaf nodes. If number of leafnodes gets large
// this may need to be optimized out of linear search but believe number
// of active leafnodes per account scope to be small and therefore cache friendly.
// Lock should be held on account.
// Lock should not be held on general account lock.
func (a *Account) removeLeafNode(c *client) {
// Make sure we hold the list lock as well.
a.lmu.Lock()
defer a.lmu.Unlock()

ll := len(a.lleafs)
for i, l := range a.lleafs {
if l == c {
Expand All @@ -960,15 +978,6 @@ func (a *Account) removeLeafNode(c *client) {
} else {
a.lleafs = a.lleafs[:ll-1]
}
// Do cluster accounting if we are a hub.
if l.isHubLeafNode() {
cluster := l.remoteCluster()
if count := a.leafClusters[cluster]; count > 1 {
a.leafClusters[cluster]--
} else if count == 1 {
delete(a.leafClusters, cluster)
}
}
return
}
}
Expand All @@ -985,11 +994,24 @@ func (a *Account) removeClient(c *client) int {
a.sysclients--
} else if c.kind == LEAF {
a.nleafs--
a.removeLeafNode(c)
// Need to do cluster accounting here.
// Do cluster accounting if we are a hub.
if c.isHubLeafNode() {
cluster := c.remoteCluster()
if count := a.leafClusters[cluster]; count > 1 {
a.leafClusters[cluster]--
} else if count == 1 {
delete(a.leafClusters, cluster)
}
}
}
}
a.mu.Unlock()

if removed && c.kind == LEAF {
a.removeLeafNode(c)
}

if c != nil && c.srv != nil && removed {
c.srv.mu.Lock()
doRemove := a != c.srv.gacc
Expand Down Expand Up @@ -2022,7 +2044,7 @@ func (a *Account) addServiceImportSub(si *serviceImport) error {
// This is similar to what initLeafNodeSmapAndSendSubs does
// TODO we need to consider performing this update as we get client subscriptions.
// This behavior would result in subscription propagation only where actually used.
a.srv.updateLeafNodes(a, sub, 1)
a.updateLeafNodes(sub, 1)
return nil
}

Expand Down
10 changes: 5 additions & 5 deletions server/client.go
Expand Up @@ -2560,7 +2560,7 @@ func (c *client) processSubEx(subject, queue, bsid []byte, cb msgHandler, noForw
}
}
// Now check on leafnode updates.
srv.updateLeafNodes(acc, sub, 1)
acc.updateLeafNodes(sub, 1)
return sub, nil
}

Expand Down Expand Up @@ -2859,7 +2859,7 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool
}
}
// Now check on leafnode updates.
c.srv.updateLeafNodes(nsub.im.acc, nsub, -1)
nsub.im.acc.updateLeafNodes(nsub, -1)
}

// Now check to see if this was part of a respMap entry for service imports.
Expand Down Expand Up @@ -2923,7 +2923,7 @@ func (c *client) processUnsub(arg []byte) error {
}
}
// Now check on leafnode updates.
srv.updateLeafNodes(acc, sub, -1)
acc.updateLeafNodes(sub, -1)
}

return nil
Expand Down Expand Up @@ -4911,7 +4911,7 @@ func (c *client) closeConnection(reason ClosedState) {
srv.gatewayUpdateSubInterest(acc.Name, sub, -1)
}
}
srv.updateLeafNodes(acc, sub, -1)
acc.updateLeafNodes(sub, -1)
} else {
// We handle queue subscribers special in case we
// have a bunch we can just send one update to the
Expand All @@ -4936,7 +4936,7 @@ func (c *client) closeConnection(reason ClosedState) {
srv.gatewayUpdateSubInterest(acc.Name, esub.sub, -(esub.n))
}
}
srv.updateLeafNodes(acc, esub.sub, -(esub.n))
acc.updateLeafNodes(esub.sub, -(esub.n))
}
if prev := acc.removeClient(c); prev == 1 {
srv.decActiveAccounts()
Expand Down
10 changes: 6 additions & 4 deletions server/consumer.go
Expand Up @@ -1043,10 +1043,12 @@ func (o *consumer) setLeader(isLeader bool) {
}

var err error
if o.ackSub, err = o.subscribeInternal(o.ackSubj, o.pushAck); err != nil {
o.mu.Unlock()
o.deleteWithoutAdvisory()
return
if o.cfg.AckPolicy != AckNone {
if o.ackSub, err = o.subscribeInternal(o.ackSubj, o.pushAck); err != nil {
o.mu.Unlock()
o.deleteWithoutAdvisory()
return
}
}

// Setup the internal sub for next message requests regardless.
Expand Down
70 changes: 45 additions & 25 deletions server/leafnode.go
Expand Up @@ -1639,11 +1639,11 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
return
}
// Collect all account subs here.
_subs := [32]*subscription{}
_subs := [1024]*subscription{}
subs := _subs[:0]
ims := []string{}

acc.mu.Lock()
acc.mu.RLock()
accName := acc.Name
accNTag := acc.nameTag

Expand Down Expand Up @@ -1682,11 +1682,15 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {

// Create a unique subject that will be used for loop detection.
lds := acc.lds
acc.mu.RUnlock()

// Check if we have to create the LDS.
if lds == _EMPTY_ {
lds = leafNodeLoopDetectionSubjectPrefix + nuid.Next()
acc.mu.Lock()
acc.lds = lds
acc.mu.Unlock()
}
acc.mu.Unlock()

// Now check for gateway interest. Leafnodes will put this into
// the proper mode to propagate, but they are not held in the account.
Expand Down Expand Up @@ -1790,16 +1794,28 @@ func (s *Server) updateInterestForAccountOnGateway(accName string, sub *subscrip
s.Debugf("No or bad account for %q, failed to update interest from gateway", accName)
return
}
s.updateLeafNodes(acc, sub, delta)
acc.updateLeafNodes(sub, delta)
}

// updateLeafNodes will make sure to update the smap for the subscription. Will
// also forward to all leaf nodes as needed.
func (s *Server) updateLeafNodes(acc *Account, sub *subscription, delta int32) {
// updateLeafNodes will make sure to update the account smap for the subscription.
// Will also forward to all leaf nodes as needed.
func (acc *Account) updateLeafNodes(sub *subscription, delta int32) {
if acc == nil || sub == nil {
return
}

// We will do checks for no leafnodes and same cluster here inline and under the
// general account read lock.
// If we feel we need to update the leafnodes we will do that out of line to avoid
// blocking routes or GWs.

acc.mu.RLock()
// First check if we even have leafnodes here.
if acc.nleafs == 0 {
acc.mu.RUnlock()
return
}

// Is this a loop detection subject.
isLDS := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))

Expand All @@ -1809,51 +1825,52 @@ func (s *Server) updateLeafNodes(acc *Account, sub *subscription, delta int32) {
cluster = string(sub.origin)
}

acc.mu.RLock()
// If we have an isolated cluster we can return early, as long as it is not a loop detection subject.
// Empty clusters will return false for the check.
if !isLDS && acc.isLeafNodeClusterIsolated(cluster) {
acc.mu.RUnlock()
return
}
// Grab all leaf nodes.
const numStackClients = 64
var _l [numStackClients]*client
leafs := append(_l[:0], acc.lleafs...)

// We can release the general account lock.
acc.mu.RUnlock()

for _, ln := range leafs {
// We can hold the list lock here to avoid having to copy a large slice.
acc.lmu.RLock()
defer acc.lmu.RUnlock()

// Do this once.
subject := string(sub.subject)

// Walk the connected leafnodes.
for _, ln := range acc.lleafs {
if ln == sub.client {
continue
}
// Check to make sure this sub does not have an origin cluster that matches the leafnode.
ln.mu.Lock()
skip := (cluster != _EMPTY_ && cluster == ln.remoteCluster()) || !ln.canSubscribe(string(sub.subject))
ln.mu.Unlock()
skip := (cluster != _EMPTY_ && cluster == ln.remoteCluster()) || (delta > 0 && !ln.canSubscribe(subject))
// If skipped, make sure that we still let go the "$LDS." subscription that allows
// the detection of a loop.
if isLDS || !skip {
ln.updateSmap(sub, delta)
}
ln.mu.Unlock()
}
}

// This will make an update to our internal smap and determine if we should send out
// an interest update to the remote side.
// Lock should be held.
func (c *client) updateSmap(sub *subscription, delta int32) {
key := keyFromSub(sub)

c.mu.Lock()
if c.leaf.smap == nil {
c.mu.Unlock()
return
}

// If we are solicited make sure this is a local client or a non-solicited leaf node
skind := sub.client.kind
updateClient := skind == CLIENT || skind == SYSTEM || skind == JETSTREAM || skind == ACCOUNT
if c.isSpokeLeafNode() && !(updateClient || (skind == LEAF && !sub.client.isSpokeLeafNode())) {
c.mu.Unlock()
return
}

Expand All @@ -1866,12 +1883,16 @@ func (c *client) updateSmap(sub *subscription, delta int32) {
c.leaf.tsubt.Stop()
c.leaf.tsubt = nil
}
c.mu.Unlock()
return
}
}

n := c.leaf.smap[key]
key := keyFromSub(sub)
n, ok := c.leaf.smap[key]
if delta < 0 && !ok {
return
}

// We will update if its a queue, if count is zero (or negative), or we were 0 and are N > 0.
update := sub.queue != nil || n == 0 || n+delta <= 0
n += delta
Expand All @@ -1883,7 +1904,6 @@ func (c *client) updateSmap(sub *subscription, delta int32) {
if update {
c.sendLeafNodeSubUpdate(key, n)
}
c.mu.Unlock()
}

// Used to force add subjects to the subject map.
Expand Down Expand Up @@ -2097,7 +2117,7 @@ func (c *client) processLeafSub(argo []byte) (err error) {
}
// Now check on leafnode updates for other leaf nodes. We understand solicited
// and non-solicited state in this call so we will do the right thing.
srv.updateLeafNodes(acc, sub, delta)
acc.updateLeafNodes(sub, delta)

return nil
}
Expand Down Expand Up @@ -2154,7 +2174,7 @@ func (c *client) processLeafUnsub(arg []byte) error {
}
}
// Now check on leafnode updates for other leaf nodes.
srv.updateLeafNodes(acc, sub, -1)
acc.updateLeafNodes(sub, -1)
return nil
}

Expand Down

0 comments on commit 76f4358

Please sign in to comment.