Skip to content

Commit

Permalink
[FIXED] DQ weighted subscribers across separate leafnode connections. (
Browse files Browse the repository at this point in the history
…#4231)

Fix for properly distributed queue requests over multiple leafnode
connections.
When a leafnode server joins two accounts in a supercluster, we want to
make sure that each connection properly takes into account the weighted
number of subscribers in each account.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Jun 10, 2023
2 parents 13b6e26 + 2765e53 commit 8c513ad
Show file tree
Hide file tree
Showing 3 changed files with 223 additions and 3 deletions.
222 changes: 221 additions & 1 deletion server/leafnode_test.go
@@ -1,4 +1,4 @@
// Copyright 2019-2021 The NATS Authors
// Copyright 2019-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -4846,3 +4846,223 @@ func TestLeafNodeDuplicateMsg(t *testing.T) {
t.Run("sub_b2_pub_a1", func(t *testing.T) { check(t, b2, a1) })
t.Run("sub_b2_pub_a2", func(t *testing.T) { check(t, b2, a2) })
}

func TestLeafNodeWithWeightedDQRequestsToSuperClusterWithSeparateAccounts(t *testing.T) {
sc := createJetStreamSuperClusterWithTemplate(t, jsClusterAccountsTempl, 3, 2)
defer sc.shutdown()

// Now create a leafnode cluster that has 2 LNs, one to each cluster but on separate accounts, ONE and TWO.
var lnTmpl = `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
{{leaf}}
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }}
`

var leafFrag = `
leaf {
listen: 127.0.0.1:-1
remotes [
{ urls: [ %s ] }
{ urls: [ %s ] }
]
}`

// We want to have two leaf node connections that join to the same local account on the leafnode servers,
// but connect to different accounts in different clusters.
c1 := sc.clusters[0] // Will connect to account ONE
c2 := sc.clusters[1] // Will connect to account TWO

genLeafTmpl := func(tmpl string) string {
t.Helper()

var ln1, ln2 []string
for _, s := range c1.servers {
if s.ClusterName() != c1.name {
continue
}
ln := s.getOpts().LeafNode
ln1 = append(ln1, fmt.Sprintf("nats://one:p@%s:%d", ln.Host, ln.Port))
}

for _, s := range c2.servers {
if s.ClusterName() != c2.name {
continue
}
ln := s.getOpts().LeafNode
ln2 = append(ln2, fmt.Sprintf("nats://two:p@%s:%d", ln.Host, ln.Port))
}
return strings.Replace(tmpl, "{{leaf}}", fmt.Sprintf(leafFrag, strings.Join(ln1, ", "), strings.Join(ln2, ", ")), 1)
}

tmpl := strings.Replace(lnTmpl, "store_dir:", fmt.Sprintf(`domain: "%s", store_dir:`, "SA"), 1)
tmpl = genLeafTmpl(tmpl)

ln := createJetStreamCluster(t, tmpl, "SA", "SA-", 3, 22280, false)
ln.waitOnClusterReady()
defer ln.shutdown()

for _, s := range ln.servers {
checkLeafNodeConnectedCount(t, s, 2)
}

// Now connect DQ subscribers to each cluster and they separate accounts, and make sure we get the right behavior, balanced between
// them when requests originate from the leaf cluster.

// Create 5 clients for each cluster / account
var c1c, c2c []*nats.Conn
for i := 0; i < 5; i++ {
nc1, _ := jsClientConnect(t, c1.randomServer(), nats.UserInfo("one", "p"))
defer nc1.Close()
c1c = append(c1c, nc1)
nc2, _ := jsClientConnect(t, c2.randomServer(), nats.UserInfo("two", "p"))
defer nc2.Close()
c2c = append(c2c, nc2)
}

createSubs := func(num int, conns []*nats.Conn) (subs []*nats.Subscription) {
for i := 0; i < num; i++ {
nc := conns[rand.Intn(len(conns))]
sub, err := nc.QueueSubscribeSync("REQUEST", "MC")
require_NoError(t, err)
subs = append(subs, sub)
nc.Flush()
}
// Let subs propagate.
time.Sleep(100 * time.Millisecond)
return subs
}
closeSubs := func(subs []*nats.Subscription) {
for _, sub := range subs {
sub.Unsubscribe()
}
}

// Simple test first.
subs1 := createSubs(1, c1c)
defer closeSubs(subs1)
subs2 := createSubs(1, c2c)
defer closeSubs(subs2)

sendRequests := func(num int) {
// Now connect to the leaf cluster and send some requests.
nc, _ := jsClientConnect(t, ln.randomServer())
defer nc.Close()

for i := 0; i < num; i++ {
require_NoError(t, nc.Publish("REQUEST", []byte("HELP")))
}
nc.Flush()
}

pending := func(subs []*nats.Subscription) (total int) {
for _, sub := range subs {
n, _, err := sub.Pending()
require_NoError(t, err)
total += n
}
return total
}

num := 1000
checkAllReceived := func() error {
total := pending(subs1) + pending(subs2)
if total == num {
return nil
}
return fmt.Errorf("Not all received: %d vs %d", total, num)
}

checkBalanced := func(total, pc1, pc2 int) {
tf := float64(total)
e1 := tf * (float64(pc1) / 100.00)
e2 := tf * (float64(pc2) / 100.00)
delta := tf / 10
p1 := float64(pending(subs1))
if p1 < e1-delta || p1 > e1+delta {
t.Fatalf("Value out of range for subs1, expected %v got %v", e1, p1)
}
p2 := float64(pending(subs2))
if p2 < e2-delta || p2 > e2+delta {
t.Fatalf("Value out of range for subs2, expected %v got %v", e2, p2)
}
}

// Now connect to the leaf cluster and send some requests.

// Simple 50/50
sendRequests(num)
checkFor(t, time.Second, 200*time.Millisecond, checkAllReceived)
checkBalanced(num, 50, 50)

closeSubs(subs1)
closeSubs(subs2)

// Now test unbalanced. 10/90
subs1 = createSubs(1, c1c)
defer closeSubs(subs1)
subs2 = createSubs(9, c2c)
defer closeSubs(subs2)

sendRequests(num)
checkFor(t, time.Second, 200*time.Millisecond, checkAllReceived)
checkBalanced(num, 10, 90)

// Now test draining the subs as we are sending from an initial balanced situation simulating a draining of a cluster.

closeSubs(subs1)
closeSubs(subs2)
subs1, subs2 = nil, nil

// These subs slightly different.
var r1, r2 atomic.Uint64
for i := 0; i < 20; i++ {
nc := c1c[rand.Intn(len(c1c))]
sub, err := nc.QueueSubscribe("REQUEST", "MC", func(m *nats.Msg) { r1.Add(1) })
require_NoError(t, err)
subs1 = append(subs1, sub)
nc.Flush()

nc = c2c[rand.Intn(len(c2c))]
sub, err = nc.QueueSubscribe("REQUEST", "MC", func(m *nats.Msg) { r2.Add(1) })
require_NoError(t, err)
subs2 = append(subs2, sub)
nc.Flush()
}
defer closeSubs(subs1)
defer closeSubs(subs2)

nc, _ := jsClientConnect(t, ln.randomServer())
defer nc.Close()

for i, dindex := 0, 1; i < num; i++ {
require_NoError(t, nc.Publish("REQUEST", []byte("HELP")))
// Check if we have more to simulate draining.
// Will drain within first ~100 requests using 20% rand test below.
// Will leave 1 behind.
if dindex < len(subs1)-1 && rand.Intn(6) > 4 {
sub := subs1[dindex]
dindex++
sub.Drain()
}
}
nc.Flush()

checkFor(t, time.Second, 200*time.Millisecond, func() error {
total := int(r1.Load() + r2.Load())
if total == num {
return nil
}
return fmt.Errorf("Not all received: %d vs %d", total, num)
})
require_True(t, r2.Load() > r1.Load())
}
2 changes: 1 addition & 1 deletion server/sublist.go
Expand Up @@ -615,7 +615,7 @@ func (s *Sublist) reduceCacheCount() {

// Helper function for auto-expanding remote qsubs.
func isRemoteQSub(sub *subscription) bool {
return sub != nil && sub.queue != nil && sub.client != nil && sub.client.kind == ROUTER
return sub != nil && sub.queue != nil && sub.client != nil && (sub.client.kind == ROUTER || sub.client.kind == LEAF)
}

// UpdateRemoteQSub should be called when we update the weight of an existing
Expand Down
2 changes: 1 addition & 1 deletion test/system_services_test.go
Expand Up @@ -262,7 +262,7 @@ func TestSystemServiceSubscribersLeafNodesWithoutSystem(t *testing.T) {
// For now we do not see all the details behind a leafnode if the leafnode is not enabled.
checkDbgNumSubs(t, nc, "foo.bar.3", 2)

checkDbgNumSubs(t, nc, "foo.bar.baz QG.22", 11)
checkDbgNumSubs(t, nc, "foo.bar.baz QG.22", 12)
}

func runSolicitLeafServerWithSystemToURL(surl string) (*server.Server, *server.Options) {
Expand Down

0 comments on commit 8c513ad

Please sign in to comment.