Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Updating of a large fleet of leafnodes. #4117

Merged
merged 1 commit into from
May 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 34 additions & 1 deletion server/accounts.go
@@ -1,4 +1,4 @@
// Copyright 2018-2022 The NATS Authors
// Copyright 2018-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -74,6 +74,7 @@ type Account struct {
usersRevoked map[string]int64
mappings []*mapping
lleafs []*client
leafClusters map[string]uint64
imports importMap
exports exportMap
js *jsAccount
Expand Down Expand Up @@ -921,6 +922,29 @@ func (a *Account) addClient(c *client) int {
return n
}

// For registering clusters for remote leafnodes.
// We only register as the hub.
func (a *Account) registerLeafNodeCluster(cluster string) {
a.mu.Lock()
defer a.mu.Unlock()
if a.leafClusters == nil {
a.leafClusters = make(map[string]uint64)
}
a.leafClusters[cluster]++
}

// Check to see if this cluster is isolated, meaning the only one.
// Read Lock should be held.
func (a *Account) isLeafNodeClusterIsolated(cluster string) bool {
if cluster == _EMPTY_ {
return false
}
if len(a.leafClusters) > 1 {
return false
}
return a.leafClusters[cluster] > 0
}

// Helper function to remove leaf nodes. If number of leafnodes gets large
// this may need to be optimized out of linear search but believe number
// of active leafnodes per account scope to be small and therefore cache friendly.
Expand All @@ -935,6 +959,15 @@ func (a *Account) removeLeafNode(c *client) {
} else {
a.lleafs = a.lleafs[:ll-1]
}
// Do cluster accounting if we are a hub.
if l.isHubLeafNode() {
cluster := l.remoteCluster()
if count := a.leafClusters[cluster]; count > 1 {
a.leafClusters[cluster]--
} else if count == 1 {
delete(a.leafClusters, cluster)
}
}
return
}
}
Expand Down
50 changes: 42 additions & 8 deletions server/jetstream_helpers_test.go
Expand Up @@ -115,6 +115,13 @@ var jsClusterAccountsTempl = `
routes = [%s]
}

websocket {
listen: 127.0.0.1:-1
compression: true
handshake_timeout: "5s"
no_tls: true
}

no_auth_user: one

accounts {
Expand Down Expand Up @@ -904,6 +911,18 @@ var jsClusterTemplWithSingleLeafNode = `
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } }
`

var jsClusterTemplWithSingleFleetLeafNode = `
listen: 127.0.0.1:-1
server_name: %s
cluster: { name: fleet }
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}

{{leaf}}

# For access to system account.
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } }
`

var jsClusterTemplWithSingleLeafNodeNoJS = `
listen: 127.0.0.1:-1
server_name: %s
Expand Down Expand Up @@ -972,8 +991,12 @@ func (c *cluster) createLeafNodeWithTemplate(name, template string) *Server {
}

func (c *cluster) createLeafNodeWithTemplateNoSystem(name, template string) *Server {
return c.createLeafNodeWithTemplateNoSystemWithProto(name, template, "nats")
}

func (c *cluster) createLeafNodeWithTemplateNoSystemWithProto(name, template, proto string) *Server {
c.t.Helper()
tmpl := c.createLeafSolicitNoSystem(template)
tmpl := c.createLeafSolicitNoSystemWithProto(template, proto)
conf := fmt.Sprintf(tmpl, name, c.t.TempDir())
s, o := RunServerWithConfig(createConfFile(c.t, []byte(conf)))
c.servers = append(c.servers, s)
Expand All @@ -983,6 +1006,10 @@ func (c *cluster) createLeafNodeWithTemplateNoSystem(name, template string) *Ser

// Helper to generate the leaf solicit configs.
func (c *cluster) createLeafSolicit(tmpl string) string {
return c.createLeafSolicitWithProto(tmpl, "nats")
}

func (c *cluster) createLeafSolicitWithProto(tmpl, proto string) string {
c.t.Helper()

// Create our leafnode cluster template first.
Expand All @@ -992,28 +1019,35 @@ func (c *cluster) createLeafSolicit(tmpl string) string {
continue
}
ln := s.getOpts().LeafNode
lns = append(lns, fmt.Sprintf("nats://%s:%d", ln.Host, ln.Port))
lnss = append(lnss, fmt.Sprintf("nats://admin:s3cr3t!@%s:%d", ln.Host, ln.Port))
lns = append(lns, fmt.Sprintf("%s://%s:%d", proto, ln.Host, ln.Port))
lnss = append(lnss, fmt.Sprintf("%s://admin:s3cr3t!@%s:%d", proto, ln.Host, ln.Port))
}
lnc := strings.Join(lns, ", ")
lnsc := strings.Join(lnss, ", ")
lconf := fmt.Sprintf(jsLeafFrag, lnc, lnsc)
return strings.Replace(tmpl, "{{leaf}}", lconf, 1)
}

func (c *cluster) createLeafSolicitNoSystem(tmpl string) string {
func (c *cluster) createLeafSolicitNoSystemWithProto(tmpl, proto string) string {
c.t.Helper()

// Create our leafnode cluster template first.
var lns string
var lns []string
for _, s := range c.servers {
if s.ClusterName() != c.name {
continue
}
ln := s.getOpts().LeafNode
lns = fmt.Sprintf("nats://%s:%d", ln.Host, ln.Port)
switch proto {
case "nats", "tls":
ln := s.getOpts().LeafNode
lns = append(lns, fmt.Sprintf("%s://%s:%d", proto, ln.Host, ln.Port))
case "ws", "wss":
ln := s.getOpts().Websocket
lns = append(lns, fmt.Sprintf("%s://%s:%d", proto, ln.Host, ln.Port))
}
}
return strings.Replace(tmpl, "{{leaf}}", fmt.Sprintf(jsLeafNoSysFrag, lns), 1)
lnc := strings.Join(lns, ", ")
return strings.Replace(tmpl, "{{leaf}}", fmt.Sprintf(jsLeafNoSysFrag, lnc), 1)
}

func (c *cluster) createLeafNodesWithTemplateMixedMode(template, clusterName string, numJsServers, numNonServers int, doJSConfig bool) *cluster {
Expand Down
84 changes: 50 additions & 34 deletions server/leafnode.go
@@ -1,4 +1,4 @@
// Copyright 2019-2022 The NATS Authors
// Copyright 2019-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -39,29 +39,31 @@ import (
"github.com/nats-io/nuid"
)

// Warning when user configures leafnode TLS insecure
const leafnodeTLSInsecureWarning = "TLS certificate chain and hostname of solicited leafnodes will not be verified. DO NOT USE IN PRODUCTION!"
const (
// Warning when user configures leafnode TLS insecure
leafnodeTLSInsecureWarning = "TLS certificate chain and hostname of solicited leafnodes will not be verified. DO NOT USE IN PRODUCTION!"

// When a loop is detected, delay the reconnect of solicited connection.
const leafNodeReconnectDelayAfterLoopDetected = 30 * time.Second
// When a loop is detected, delay the reconnect of solicited connection.
leafNodeReconnectDelayAfterLoopDetected = 30 * time.Second

// When a server receives a message causing a permission violation, the
// connection is closed and it won't attempt to reconnect for that long.
const leafNodeReconnectAfterPermViolation = 30 * time.Second
// When a server receives a message causing a permission violation, the
// connection is closed and it won't attempt to reconnect for that long.
leafNodeReconnectAfterPermViolation = 30 * time.Second

// When we have the same cluster name as the hub.
const leafNodeReconnectDelayAfterClusterNameSame = 30 * time.Second
// When we have the same cluster name as the hub.
leafNodeReconnectDelayAfterClusterNameSame = 30 * time.Second

// Prefix for loop detection subject
const leafNodeLoopDetectionSubjectPrefix = "$LDS."
// Prefix for loop detection subject
leafNodeLoopDetectionSubjectPrefix = "$LDS."

// Path added to URL to indicate to WS server that the connection is a
// LEAF connection as opposed to a CLIENT.
const leafNodeWSPath = "/leafnode"
// Path added to URL to indicate to WS server that the connection is a
// LEAF connection as opposed to a CLIENT.
leafNodeWSPath = "/leafnode"

// This is the time the server will wait, when receiving a CONNECT,
// before closing the connection if the required minimum version is not met.
const leafNodeWaitBeforeClose = 5 * time.Second
// This is the time the server will wait, when receiving a CONNECT,
// before closing the connection if the required minimum version is not met.
leafNodeWaitBeforeClose = 5 * time.Second
)

type leaf struct {
// We have any auth stuff here for solicited connections.
Expand Down Expand Up @@ -1579,6 +1581,11 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro

c.mu.Unlock()

// Register the cluster, even if empty, as long as we are acting as a hub.
if !proto.Hub {
c.acc.registerLeafNodeCluster(proto.Cluster)
}

// Add in the leafnode here since we passed through auth at this point.
s.addLeafNodeConnection(c, proto.Name, proto.Cluster, true)

Expand Down Expand Up @@ -1793,32 +1800,41 @@ func (s *Server) updateLeafNodes(acc *Account, sub *subscription, delta int32) {
return
}

_l := [32]*client{}
leafs := _l[:0]
// Is this a loop detection subject.
isLDS := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))

// Capture the cluster even if its empty.
cluster := _EMPTY_
if sub.origin != nil {
cluster = string(sub.origin)
}

// Grab all leaf nodes. Ignore a leafnode if sub's client is a leafnode and matches.
acc.mu.RLock()
for _, ln := range acc.lleafs {
if ln != sub.client {
leafs = append(leafs, ln)
}
// If we have an isolated cluster we can return early, as long as it is not a loop detection subject.
// Empty clusters will return false for the check.
if !isLDS && acc.isLeafNodeClusterIsolated(cluster) {
acc.mu.RUnlock()
return
}
// Grab all leaf nodes.
const numStackClients = 64
var _l [numStackClients]*client
leafs := append(_l[:0], acc.lleafs...)
acc.mu.RUnlock()

for _, ln := range leafs {
// Check to make sure this sub does not have an origin cluster than matches the leafnode.
if ln == sub.client {
continue
}
// Check to make sure this sub does not have an origin cluster that matches the leafnode.
ln.mu.Lock()
skip := (sub.origin != nil && string(sub.origin) == ln.remoteCluster()) || !ln.canSubscribe(string(sub.subject))
skip := (cluster != _EMPTY_ && cluster == ln.remoteCluster()) || !ln.canSubscribe(string(sub.subject))
ln.mu.Unlock()
// If skipped, make sure that we still let go the "$LDS." subscription that allows
// the detection of a loop.
if skip && bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix)) {
skip = false
}
ln.mu.Unlock()
if skip {
continue
if isLDS || !skip {
ln.updateSmap(sub, delta)
}
ln.updateSmap(sub, delta)
}
}

Expand Down
54 changes: 54 additions & 0 deletions server/norace_test.go
Expand Up @@ -7810,3 +7810,57 @@ func TestNoRaceParallelStreamAndConsumerCreation(t *testing.T) {
t.Fatalf("Expected only one consumer to be really created, got %d out of %d attempts", numConsumers, np)
}
}

func TestNoRaceJetStreamClusterLeafnodeConnectPerf(t *testing.T) {
// Uncomment to run. Needs to be on a big machine. Do not want as part of Travis tests atm.
skip(t)

tmpl := strings.Replace(jsClusterAccountsTempl, "store_dir:", "domain: cloud, store_dir:", 1)
c := createJetStreamCluster(t, tmpl, "CLOUD", _EMPTY_, 3, 18033, true)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "STATE",
Subjects: []string{"STATE.GLOBAL.CELL1.*.>"},
Replicas: 3,
})
require_NoError(t, err)

tmpl = strings.Replace(jsClusterTemplWithSingleFleetLeafNode, "store_dir:", "domain: vehicle, store_dir:", 1)

var vinSerial int
genVIN := func() string {
vinSerial++
return fmt.Sprintf("7PDSGAALXNN%06d", vinSerial)
}

numVehicles := 500
for i := 0; i < numVehicles; i++ {
start := time.Now()
vin := genVIN()
ln := c.createLeafNodeWithTemplateNoSystemWithProto(vin, tmpl, "ws")
nc, js := jsClientConnect(t, ln)
_, err := js.AddStream(&nats.StreamConfig{
Name: "VEHICLE",
Subjects: []string{"STATE.GLOBAL.LOCAL.>"},
Sources: []*nats.StreamSource{{
Name: "STATE",
FilterSubject: fmt.Sprintf("STATE.GLOBAL.CELL1.%s.>", vin),
External: &nats.ExternalStream{
APIPrefix: "$JS.cloud.API",
DeliverPrefix: fmt.Sprintf("DELIVER.STATE.GLOBAL.CELL1.%s", vin),
},
}},
})
require_NoError(t, err)
// Create the sourced stream.
checkLeafNodeConnectedCount(t, ln, 1)
if elapsed := time.Since(start); elapsed > 2*time.Second {
t.Fatalf("Took too long to create leafnode %d connection: %v", i+1, elapsed)
}
nc.Close()
}
}