Skip to content

Commit

Permalink
[FIXED] Duplicate queue subs over leafnodes with hub stream import/ex…
Browse files Browse the repository at this point in the history
…port with possible dataloss (#4299)

When a queue subscriber was updated multiple times over a leafnode
connection we added in more shadow subscriptions which could become
zombies when the connection went away.

In a case where a leafnode server had multiple queue subscribers on the
same queue group, the hub server would add in multiple shadow subs.
These subs would not be properly cleaned up and could lead to stale
connections being associated with them.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Jul 11, 2023
2 parents eb2aa35 + 353d543 commit 97dbd6b
Show file tree
Hide file tree
Showing 2 changed files with 186 additions and 3 deletions.
7 changes: 5 additions & 2 deletions server/leafnode.go
Expand Up @@ -2101,8 +2101,11 @@ func (c *client) processLeafSub(argo []byte) (err error) {
spoke := c.isSpokeLeafNode()
c.mu.Unlock()

if err := c.addShadowSubscriptions(acc, sub); err != nil {
c.Errorf(err.Error())
// Only add in shadow subs if a new sub or qsub.
if osub == nil {
if err := c.addShadowSubscriptions(acc, sub); err != nil {
c.Errorf(err.Error())
}
}

// If we are not solicited, treat leaf node subscriptions similar to a
Expand Down
182 changes: 181 additions & 1 deletion server/leafnode_test.go
Expand Up @@ -5111,7 +5111,7 @@ func TestLeafNodeWithWeightedDQRequestsToSuperClusterWithStreamImportAccounts(t
sc := createJetStreamSuperClusterWithTemplate(t, tmpl, 5, 2)
defer sc.shutdown()

// Now create a leafnode cluster that has 2 LNs, one to each cluster but on separate accounts, ONE and TWO.
// Now create a leafnode cluster that has 2 LNs, one to each cluster but on separate accounts, STL and KSC.
var lnTmpl = `
listen: 127.0.0.1:-1
server_name: %s
Expand Down Expand Up @@ -5381,3 +5381,183 @@ func TestLeafNodeWithWeightedDQRequestsToSuperClusterWithStreamImportAccounts(t

checkFor(t, time.Second, 200*time.Millisecond, checkAllRespReceived)
}

func TestLeafNodeWithWeightedDQResponsesWithStreamImportAccountsWithUnsub(t *testing.T) {
var tmpl = `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
leaf { listen: 127.0.0.1:-1 }
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
accounts {
EFG {
users = [ { user: "efg", pass: "p" } ]
jetstream: enabled
exports [ { stream: "RESPONSE" } ]
}
STL {
users = [ { user: "stl", pass: "p" } ]
imports [ { stream: { account: EFG, subject: "RESPONSE"} } ]
}
KSC {
users = [ { user: "ksc", pass: "p" } ]
imports [ { stream: { account: EFG, subject: "RESPONSE"} } ]
}
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}`

c := createJetStreamClusterWithTemplate(t, tmpl, "US-CENTRAL", 3)
defer c.shutdown()

// Now create a leafnode cluster that has 2 LNs, one to each cluster but on separate accounts, STL and KSC.
var lnTmpl = `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
{{leaf}}
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }}
`

var leafFrag = `
leaf {
listen: 127.0.0.1:-1
remotes [ { urls: [ %s ] } ]
}`

genLeafTmpl := func(tmpl string) string {
t.Helper()

var ln []string
for _, s := range c.servers {
lno := s.getOpts().LeafNode
ln = append(ln, fmt.Sprintf("nats://ksc:p@%s:%d", lno.Host, lno.Port))
}
return strings.Replace(tmpl, "{{leaf}}", fmt.Sprintf(leafFrag, strings.Join(ln, ", ")), 1)
}

tmpl = strings.Replace(lnTmpl, "store_dir:", fmt.Sprintf(`domain: "%s", store_dir:`, "SA"), 1)
tmpl = genLeafTmpl(tmpl)

ln := createJetStreamCluster(t, tmpl, "SA", "SA-", 3, 22280, false)
ln.waitOnClusterReady()
defer ln.shutdown()

for _, s := range ln.servers {
checkLeafNodeConnectedCount(t, s, 1)
}

// Create 10 subscribers.
var rsubs []*nats.Subscription

closeSubs := func(subs []*nats.Subscription) {
for _, sub := range subs {
sub.Unsubscribe()
}
}

checkAllRespReceived := func() error {
t.Helper()
var total int
for _, sub := range rsubs {
n, _, err := sub.Pending()
require_NoError(t, err)
total += n
}
if total == 100 {
return nil
}
return fmt.Errorf("Not all responses received: %d vs %d", total, 100)
}

s := ln.randomServer()
for i := 0; i < 4; i++ {
nc, _ := jsClientConnect(t, s)
defer nc.Close()
sub, err := nc.QueueSubscribeSync("RESPONSE", "SA")
require_NoError(t, err)
nc.Flush()
rsubs = append(rsubs, sub)
}

// Now connect and send responses from EFG in cloud.
nc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("efg", "p"))
for i := 0; i < 100; i++ {
require_NoError(t, nc.Publish("RESPONSE", []byte("OK")))
}
nc.Flush()

// Make sure all received.
checkFor(t, time.Second, 200*time.Millisecond, checkAllRespReceived)

checkAccountInterest := func(s *Server, accName string) *SublistResult {
t.Helper()
acc, err := s.LookupAccount(accName)
require_NoError(t, err)
acc.mu.RLock()
r := acc.sl.Match("RESPONSE")
acc.mu.RUnlock()
return r
}

checkInterest := func() error {
t.Helper()
for _, s := range c.servers {
if r := checkAccountInterest(s, "KSC"); len(r.psubs)+len(r.qsubs) > 0 {
return fmt.Errorf("Subs still present for %q: %+v", "KSC", r)
}
if r := checkAccountInterest(s, "EFG"); len(r.psubs)+len(r.qsubs) > 0 {
return fmt.Errorf("Subs still present for %q: %+v", "EFG", r)
}
}
return nil
}

// Now unsub them and create new ones on a different server.
closeSubs(rsubs)
rsubs = rsubs[:0]

// Also restart the server that we had all the rsubs on.
s.Shutdown()
s.WaitForShutdown()
s = ln.restartServer(s)
ln.waitOnClusterReady()
ln.waitOnServerCurrent(s)

checkFor(t, time.Second, 200*time.Millisecond, checkInterest)

for i := 0; i < 4; i++ {
nc, _ := jsClientConnect(t, s)
defer nc.Close()
sub, err := nc.QueueSubscribeSync("RESPONSE", "SA")
require_NoError(t, err)
nc.Flush()
rsubs = append(rsubs, sub)
}

for i := 0; i < 100; i++ {
require_NoError(t, nc.Publish("RESPONSE", []byte("OK")))
}
nc.Flush()

// Make sure all received.
checkFor(t, time.Second, 200*time.Millisecond, checkAllRespReceived)

closeSubs(rsubs)
checkFor(t, time.Second, 200*time.Millisecond, checkInterest)
}

0 comments on commit 97dbd6b

Please sign in to comment.