Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] DQ weighted subscribers across separate leafnode connections. #4231

Merged
merged 2 commits into from Jun 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
222 changes: 221 additions & 1 deletion server/leafnode_test.go
@@ -1,4 +1,4 @@
// Copyright 2019-2021 The NATS Authors
// Copyright 2019-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -4846,3 +4846,223 @@ func TestLeafNodeDuplicateMsg(t *testing.T) {
t.Run("sub_b2_pub_a1", func(t *testing.T) { check(t, b2, a1) })
t.Run("sub_b2_pub_a2", func(t *testing.T) { check(t, b2, a2) })
}

func TestLeafNodeWithWeightedDQRequestsToSuperClusterWithSeparateAccounts(t *testing.T) {
sc := createJetStreamSuperClusterWithTemplate(t, jsClusterAccountsTempl, 3, 2)
defer sc.shutdown()

// Now create a leafnode cluster that has 2 LNs, one to each cluster but on separate accounts, ONE and TWO.
var lnTmpl = `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}

{{leaf}}

cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}

accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }}
`

var leafFrag = `
leaf {
listen: 127.0.0.1:-1
remotes [
{ urls: [ %s ] }
{ urls: [ %s ] }
]
}`

// We want to have two leaf node connections that join to the same local account on the leafnode servers,
// but connect to different accounts in different clusters.
c1 := sc.clusters[0] // Will connect to account ONE
c2 := sc.clusters[1] // Will connect to account TWO

genLeafTmpl := func(tmpl string) string {
t.Helper()

var ln1, ln2 []string
for _, s := range c1.servers {
if s.ClusterName() != c1.name {
continue
}
ln := s.getOpts().LeafNode
ln1 = append(ln1, fmt.Sprintf("nats://one:p@%s:%d", ln.Host, ln.Port))
}

for _, s := range c2.servers {
if s.ClusterName() != c2.name {
continue
}
ln := s.getOpts().LeafNode
ln2 = append(ln2, fmt.Sprintf("nats://two:p@%s:%d", ln.Host, ln.Port))
}
return strings.Replace(tmpl, "{{leaf}}", fmt.Sprintf(leafFrag, strings.Join(ln1, ", "), strings.Join(ln2, ", ")), 1)
}

tmpl := strings.Replace(lnTmpl, "store_dir:", fmt.Sprintf(`domain: "%s", store_dir:`, "SA"), 1)
tmpl = genLeafTmpl(tmpl)

ln := createJetStreamCluster(t, tmpl, "SA", "SA-", 3, 22280, false)
ln.waitOnClusterReady()
defer ln.shutdown()

for _, s := range ln.servers {
checkLeafNodeConnectedCount(t, s, 2)
}

// Now connect DQ subscribers to each cluster and they separate accounts, and make sure we get the right behavior, balanced between
// them when requests originate from the leaf cluster.

// Create 5 clients for each cluster / account
var c1c, c2c []*nats.Conn
for i := 0; i < 5; i++ {
nc1, _ := jsClientConnect(t, c1.randomServer(), nats.UserInfo("one", "p"))
defer nc1.Close()
c1c = append(c1c, nc1)
nc2, _ := jsClientConnect(t, c2.randomServer(), nats.UserInfo("two", "p"))
defer nc2.Close()
c2c = append(c2c, nc2)
}

createSubs := func(num int, conns []*nats.Conn) (subs []*nats.Subscription) {
for i := 0; i < num; i++ {
nc := conns[rand.Intn(len(conns))]
sub, err := nc.QueueSubscribeSync("REQUEST", "MC")
require_NoError(t, err)
subs = append(subs, sub)
nc.Flush()
}
// Let subs propagate.
time.Sleep(100 * time.Millisecond)
return subs
}
closeSubs := func(subs []*nats.Subscription) {
for _, sub := range subs {
sub.Unsubscribe()
}
}

// Simple test first.
subs1 := createSubs(1, c1c)
defer closeSubs(subs1)
subs2 := createSubs(1, c2c)
defer closeSubs(subs2)

sendRequests := func(num int) {
// Now connect to the leaf cluster and send some requests.
nc, _ := jsClientConnect(t, ln.randomServer())
defer nc.Close()

for i := 0; i < num; i++ {
require_NoError(t, nc.Publish("REQUEST", []byte("HELP")))
}
nc.Flush()
}

pending := func(subs []*nats.Subscription) (total int) {
for _, sub := range subs {
n, _, err := sub.Pending()
require_NoError(t, err)
total += n
}
return total
}

num := 1000
checkAllReceived := func() error {
total := pending(subs1) + pending(subs2)
if total == num {
return nil
}
return fmt.Errorf("Not all received: %d vs %d", total, num)
}

checkBalanced := func(total, pc1, pc2 int) {
tf := float64(total)
e1 := tf * (float64(pc1) / 100.00)
e2 := tf * (float64(pc2) / 100.00)
delta := tf / 10
p1 := float64(pending(subs1))
if p1 < e1-delta || p1 > e1+delta {
t.Fatalf("Value out of range for subs1, expected %v got %v", e1, p1)
}
p2 := float64(pending(subs2))
if p2 < e2-delta || p2 > e2+delta {
t.Fatalf("Value out of range for subs2, expected %v got %v", e2, p2)
}
}

// Now connect to the leaf cluster and send some requests.

// Simple 50/50
sendRequests(num)
checkFor(t, time.Second, 200*time.Millisecond, checkAllReceived)
checkBalanced(num, 50, 50)

closeSubs(subs1)
closeSubs(subs2)

// Now test unbalanced. 10/90
subs1 = createSubs(1, c1c)
defer closeSubs(subs1)
subs2 = createSubs(9, c2c)
defer closeSubs(subs2)

sendRequests(num)
checkFor(t, time.Second, 200*time.Millisecond, checkAllReceived)
checkBalanced(num, 10, 90)

// Now test draining the subs as we are sending from an initial balanced situation simulating a draining of a cluster.

closeSubs(subs1)
closeSubs(subs2)
subs1, subs2 = nil, nil

// These subs slightly different.
var r1, r2 atomic.Uint64
for i := 0; i < 20; i++ {
nc := c1c[rand.Intn(len(c1c))]
sub, err := nc.QueueSubscribe("REQUEST", "MC", func(m *nats.Msg) { r1.Add(1) })
require_NoError(t, err)
subs1 = append(subs1, sub)
nc.Flush()

nc = c2c[rand.Intn(len(c2c))]
sub, err = nc.QueueSubscribe("REQUEST", "MC", func(m *nats.Msg) { r2.Add(1) })
require_NoError(t, err)
subs2 = append(subs2, sub)
nc.Flush()
}
defer closeSubs(subs1)
defer closeSubs(subs2)

nc, _ := jsClientConnect(t, ln.randomServer())
defer nc.Close()

for i, dindex := 0, 1; i < num; i++ {
require_NoError(t, nc.Publish("REQUEST", []byte("HELP")))
// Check if we have more to simulate draining.
// Will drain within first ~100 requests using 20% rand test below.
// Will leave 1 behind.
if dindex < len(subs1)-1 && rand.Intn(6) > 4 {
sub := subs1[dindex]
dindex++
sub.Drain()
}
}
nc.Flush()

checkFor(t, time.Second, 200*time.Millisecond, func() error {
total := int(r1.Load() + r2.Load())
if total == num {
return nil
}
return fmt.Errorf("Not all received: %d vs %d", total, num)
})
require_True(t, r2.Load() > r1.Load())
}
2 changes: 1 addition & 1 deletion server/sublist.go
Expand Up @@ -615,7 +615,7 @@ func (s *Sublist) reduceCacheCount() {

// Helper function for auto-expanding remote qsubs.
func isRemoteQSub(sub *subscription) bool {
return sub != nil && sub.queue != nil && sub.client != nil && sub.client.kind == ROUTER
return sub != nil && sub.queue != nil && sub.client != nil && (sub.client.kind == ROUTER || sub.client.kind == LEAF)
}

// UpdateRemoteQSub should be called when we update the weight of an existing
Expand Down
2 changes: 1 addition & 1 deletion test/system_services_test.go
Expand Up @@ -262,7 +262,7 @@ func TestSystemServiceSubscribersLeafNodesWithoutSystem(t *testing.T) {
// For now we do not see all the details behind a leafnode if the leafnode is not enabled.
checkDbgNumSubs(t, nc, "foo.bar.3", 2)

checkDbgNumSubs(t, nc, "foo.bar.baz QG.22", 11)
checkDbgNumSubs(t, nc, "foo.bar.baz QG.22", 12)
}

func runSolicitLeafServerWithSystemToURL(surl string) (*server.Server, *server.Options) {
Expand Down