Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Daisy chained leafnodes sometimes would not propagate interest #4207

Merged
merged 1 commit into from Jun 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/accounts.go
Expand Up @@ -958,7 +958,7 @@ func (a *Account) isLeafNodeClusterIsolated(cluster string) bool {
if len(a.leafClusters) > 1 {
return false
}
return a.leafClusters[cluster] > 0
return a.leafClusters[cluster] == uint64(a.nleafs)
}

// Helper function to remove leaf nodes. If number of leafnodes gets large
Expand Down
4 changes: 2 additions & 2 deletions server/events_test.go
Expand Up @@ -436,8 +436,8 @@ func checkLeafNodeConnectedCount(t testing.TB, s *Server, lnCons int) {
t.Helper()
checkFor(t, 5*time.Second, 15*time.Millisecond, func() error {
if nln := s.NumLeafNodes(); nln != lnCons {
return fmt.Errorf("Expected %d connected leafnode(s) for server %q, got %d",
lnCons, s.ID(), nln)
return fmt.Errorf("Expected %d connected leafnode(s) for server %v, got %d",
lnCons, s, nln)
}
return nil
})
Expand Down
158 changes: 158 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -4122,3 +4122,161 @@ func TestJetStreamClusterStaleDirectGetOnRestart(t *testing.T) {
t.Fatalf("Expected no errors but got %v", <-errCh)
}
}

// This test mimics a user's setup where there is a cloud cluster/domain, and one for eu and ap that are leafnoded into the
// cloud cluster, and one for cn that is leafnoded into the ap cluster.
// We broke basic connectivity in 2.9.17 from publishing in eu for delivery in cn on same account which is daisy chained through ap.
// We will also test cross account delivery in this test as well.
func TestJetStreamClusterLeafnodePlusDaisyChainSetup(t *testing.T) {
var cloudTmpl = `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, domain: CLOUD, store_dir: '%s'}

leaf { listen: 127.0.0.1:-1 }

cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}

accounts {
F {
jetstream: enabled
users = [ { user: "F", pass: "pass" } ]
exports [ { stream: "F.>" } ]
}
T {
jetstream: enabled
users = [ { user: "T", pass: "pass" } ]
imports [ { stream: { account: F, subject: "F.>"} } ]
}
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}`

// Now create the cloud and make sure we are connected.
// Cloud
c := createJetStreamCluster(t, cloudTmpl, "CLOUD", _EMPTY_, 3, 22020, false)
defer c.shutdown()

var lnTmpl = `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}

{{leaf}}

cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}

accounts {
F {
jetstream: enabled
users = [ { user: "F", pass: "pass" } ]
exports [ { stream: "F.>" } ]
}
T {
jetstream: enabled
users = [ { user: "T", pass: "pass" } ]
imports [ { stream: { account: F, subject: "F.>"} } ]
}
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}`

var leafFrag = `
leaf {
listen: 127.0.0.1:-1
remotes [ { urls: [ %s ], account: "T" }, { urls: [ %s ], account: "F" } ]
}`

genLeafTmpl := func(tmpl string, c *cluster) string {
t.Helper()
// Create our leafnode cluster template first.
var lnt, lnf []string
for _, s := range c.servers {
if s.ClusterName() != c.name {
continue
}
ln := s.getOpts().LeafNode
lnt = append(lnt, fmt.Sprintf("nats://T:pass@%s:%d", ln.Host, ln.Port))
lnf = append(lnf, fmt.Sprintf("nats://F:pass@%s:%d", ln.Host, ln.Port))
}
lntc := strings.Join(lnt, ", ")
lnfc := strings.Join(lnf, ", ")
return strings.Replace(tmpl, "{{leaf}}", fmt.Sprintf(leafFrag, lntc, lnfc), 1)
}

// Cluster EU
// Domain is "EU'
tmpl := strings.Replace(lnTmpl, "store_dir:", fmt.Sprintf(`domain: "%s", store_dir:`, "EU"), 1)
tmpl = genLeafTmpl(tmpl, c)
lceu := createJetStreamCluster(t, tmpl, "EU", "EU-", 3, 22110, false)
lceu.waitOnClusterReady()
defer lceu.shutdown()

for _, s := range lceu.servers {
checkLeafNodeConnectedCount(t, s, 2)
}

// Cluster AP
// Domain is "AP'
tmpl = strings.Replace(lnTmpl, "store_dir:", fmt.Sprintf(`domain: "%s", store_dir:`, "AP"), 1)
tmpl = genLeafTmpl(tmpl, c)
lcap := createJetStreamCluster(t, tmpl, "AP", "AP-", 3, 22180, false)
lcap.waitOnClusterReady()
defer lcap.shutdown()

for _, s := range lcap.servers {
checkLeafNodeConnectedCount(t, s, 2)
}

// Cluster CN
// Domain is "CN'
// This one connects to AP, not the cloud hub.
tmpl = strings.Replace(lnTmpl, "store_dir:", fmt.Sprintf(`domain: "%s", store_dir:`, "CN"), 1)
tmpl = genLeafTmpl(tmpl, lcap)
lccn := createJetStreamCluster(t, tmpl, "CN", "CN-", 3, 22280, false)
lccn.waitOnClusterReady()
defer lccn.shutdown()

for _, s := range lccn.servers {
checkLeafNodeConnectedCount(t, s, 2)
}

// Now connect to CN on account F and subscribe to data.
nc, _ := jsClientConnect(t, lccn.randomServer(), nats.UserInfo("F", "pass"))
defer nc.Close()
fsub, err := nc.SubscribeSync("F.EU.>")
require_NoError(t, err)

// Same for account T where the import is.
nc, _ = jsClientConnect(t, lccn.randomServer(), nats.UserInfo("T", "pass"))
defer nc.Close()
tsub, err := nc.SubscribeSync("F.EU.>")
require_NoError(t, err)

// Let sub propagate.
time.Sleep(500 * time.Millisecond)

// Now connect to EU on account F and generate data.
nc, _ = jsClientConnect(t, lceu.randomServer(), nats.UserInfo("F", "pass"))
defer nc.Close()

num := 10
for i := 0; i < num; i++ {
err := nc.Publish("F.EU.DATA", []byte(fmt.Sprintf("MSG-%d", i)))
require_NoError(t, err)
}

checkSubsPending(t, fsub, num)
// Since we export and import in each cluster, we will receive 4x.
// First hop from EU -> CLOUD is 1F and 1T
// Second hop from CLOUD -> AP is 1F, 1T and another 1T
// Third hop from AP -> CN is 1F, 1T, 1T and 1T
// Each cluster hop that has the export/import mapping will add another T message copy.
checkSubsPending(t, tsub, num*4)
}