Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Duplicate queue subs over leafnodes with hub stream import/export with possible dataloss #4299

Merged
merged 1 commit into from Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 5 additions & 2 deletions server/leafnode.go
Expand Up @@ -2101,8 +2101,11 @@ func (c *client) processLeafSub(argo []byte) (err error) {
spoke := c.isSpokeLeafNode()
c.mu.Unlock()

if err := c.addShadowSubscriptions(acc, sub); err != nil {
c.Errorf(err.Error())
// Only add in shadow subs if a new sub or qsub.
if osub == nil {
if err := c.addShadowSubscriptions(acc, sub); err != nil {
c.Errorf(err.Error())
}
}

// If we are not solicited, treat leaf node subscriptions similar to a
Expand Down
182 changes: 181 additions & 1 deletion server/leafnode_test.go
Expand Up @@ -5111,7 +5111,7 @@ func TestLeafNodeWithWeightedDQRequestsToSuperClusterWithStreamImportAccounts(t
sc := createJetStreamSuperClusterWithTemplate(t, tmpl, 5, 2)
defer sc.shutdown()

// Now create a leafnode cluster that has 2 LNs, one to each cluster but on separate accounts, ONE and TWO.
// Now create a leafnode cluster that has 2 LNs, one to each cluster but on separate accounts, STL and KSC.
var lnTmpl = `
listen: 127.0.0.1:-1
server_name: %s
Expand Down Expand Up @@ -5381,3 +5381,183 @@ func TestLeafNodeWithWeightedDQRequestsToSuperClusterWithStreamImportAccounts(t

checkFor(t, time.Second, 200*time.Millisecond, checkAllRespReceived)
}

func TestLeafNodeWithWeightedDQResponsesWithStreamImportAccountsWithUnsub(t *testing.T) {
var tmpl = `
listen: 127.0.0.1:-1

server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}

leaf { listen: 127.0.0.1:-1 }

cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}

accounts {
EFG {
users = [ { user: "efg", pass: "p" } ]
jetstream: enabled
exports [ { stream: "RESPONSE" } ]
}
STL {
users = [ { user: "stl", pass: "p" } ]
imports [ { stream: { account: EFG, subject: "RESPONSE"} } ]
}
KSC {
users = [ { user: "ksc", pass: "p" } ]
imports [ { stream: { account: EFG, subject: "RESPONSE"} } ]
}
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}`

c := createJetStreamClusterWithTemplate(t, tmpl, "US-CENTRAL", 3)
defer c.shutdown()

// Now create a leafnode cluster that has 2 LNs, one to each cluster but on separate accounts, STL and KSC.
var lnTmpl = `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}

{{leaf}}

cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}

accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }}
`

var leafFrag = `
leaf {
listen: 127.0.0.1:-1
remotes [ { urls: [ %s ] } ]
}`

genLeafTmpl := func(tmpl string) string {
t.Helper()

var ln []string
for _, s := range c.servers {
lno := s.getOpts().LeafNode
ln = append(ln, fmt.Sprintf("nats://ksc:p@%s:%d", lno.Host, lno.Port))
}
return strings.Replace(tmpl, "{{leaf}}", fmt.Sprintf(leafFrag, strings.Join(ln, ", ")), 1)
}

tmpl = strings.Replace(lnTmpl, "store_dir:", fmt.Sprintf(`domain: "%s", store_dir:`, "SA"), 1)
tmpl = genLeafTmpl(tmpl)

ln := createJetStreamCluster(t, tmpl, "SA", "SA-", 3, 22280, false)
ln.waitOnClusterReady()
defer ln.shutdown()

for _, s := range ln.servers {
checkLeafNodeConnectedCount(t, s, 1)
}

// Create 10 subscribers.
var rsubs []*nats.Subscription

closeSubs := func(subs []*nats.Subscription) {
for _, sub := range subs {
sub.Unsubscribe()
}
}

checkAllRespReceived := func() error {
t.Helper()
var total int
for _, sub := range rsubs {
n, _, err := sub.Pending()
require_NoError(t, err)
total += n
}
if total == 100 {
return nil
}
return fmt.Errorf("Not all responses received: %d vs %d", total, 100)
}

s := ln.randomServer()
for i := 0; i < 4; i++ {
nc, _ := jsClientConnect(t, s)
defer nc.Close()
sub, err := nc.QueueSubscribeSync("RESPONSE", "SA")
require_NoError(t, err)
nc.Flush()
rsubs = append(rsubs, sub)
}

// Now connect and send responses from EFG in cloud.
nc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("efg", "p"))
for i := 0; i < 100; i++ {
require_NoError(t, nc.Publish("RESPONSE", []byte("OK")))
}
nc.Flush()

// Make sure all received.
checkFor(t, time.Second, 200*time.Millisecond, checkAllRespReceived)

checkAccountInterest := func(s *Server, accName string) *SublistResult {
t.Helper()
acc, err := s.LookupAccount(accName)
require_NoError(t, err)
acc.mu.RLock()
r := acc.sl.Match("RESPONSE")
acc.mu.RUnlock()
return r
}

checkInterest := func() error {
t.Helper()
for _, s := range c.servers {
if r := checkAccountInterest(s, "KSC"); len(r.psubs)+len(r.qsubs) > 0 {
return fmt.Errorf("Subs still present for %q: %+v", "KSC", r)
}
if r := checkAccountInterest(s, "EFG"); len(r.psubs)+len(r.qsubs) > 0 {
return fmt.Errorf("Subs still present for %q: %+v", "EFG", r)
}
}
return nil
}

// Now unsub them and create new ones on a different server.
closeSubs(rsubs)
rsubs = rsubs[:0]

// Also restart the server that we had all the rsubs on.
s.Shutdown()
s.WaitForShutdown()
s = ln.restartServer(s)
ln.waitOnClusterReady()
ln.waitOnServerCurrent(s)

checkFor(t, time.Second, 200*time.Millisecond, checkInterest)

for i := 0; i < 4; i++ {
nc, _ := jsClientConnect(t, s)
defer nc.Close()
sub, err := nc.QueueSubscribeSync("RESPONSE", "SA")
require_NoError(t, err)
nc.Flush()
rsubs = append(rsubs, sub)
}

for i := 0; i < 100; i++ {
require_NoError(t, nc.Publish("RESPONSE", []byte("OK")))
}
nc.Flush()

// Make sure all received.
checkFor(t, time.Second, 200*time.Millisecond, checkAllRespReceived)

closeSubs(rsubs)
checkFor(t, time.Second, 200*time.Millisecond, checkInterest)
}