Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Make sure to not forward a message across a route for routed dq subs #4578

Merged
merged 1 commit into from Sep 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions server/client.go
Expand Up @@ -4558,6 +4558,12 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
continue
}

// If we are a spoke leaf node make sure to not forward across routes.
// This mimics same behavior for normal subs above.
if c.kind == LEAF && c.isSpokeLeafNode() && sub.client.kind == ROUTER {
continue
}

// We have taken care of preferring local subs for a message from a route above.
// Here we just care about a client or leaf and skipping a leaf and preferring locals.
if dst := sub.client.kind; dst == ROUTER || dst == LEAF {
Expand Down
74 changes: 74 additions & 0 deletions server/leafnode_test.go
Expand Up @@ -7126,5 +7126,79 @@ func TestLeafNodeSlowConsumer(t *testing.T) {
time.Sleep(1 * time.Millisecond)
}
t.Fatalf("Timed out waiting for slow consumer leafnodes, got: %v, expected: %v", got, expected)
}

// https://github.com/nats-io/nats-server/issues/4367
func TestLeafNodeDQMultiAccountExportImport(t *testing.T) {
bConf := createConfFile(t, []byte(`
listen: 127.0.0.1:-1
server_name: cluster-b-0
accounts {
$SYS: { users: [ { user: admin, password: pwd } ] },
AGG: {
exports: [ { service: "PING.>" } ]
users: [ { user: agg, password: agg } ]
}
}
leaf { listen: 127.0.0.1:-1 }
`))

sb, ob := RunServerWithConfig(bConf)
defer sb.Shutdown()

tmpl := `
listen: 127.0.0.1:-1
server_name: %s
jetstream: { store_dir: '%s' }
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
accounts {
$SYS: { users: [ { user: admin, password: pwd } ] },
A: {
mappings: { "A.>" : ">" }
exports: [ { service: A.> } ]
users: [ { user: a, password: a } ]
},
AGG: {
imports: [ { service: { subject: A.>, account: A } } ]
users: [ { user: agg, password: agg } ]
},
}
leaf {
remotes: [ {
urls: [ nats-leaf://agg:agg@127.0.0.1:{LEAF_PORT} ]
account: AGG
} ]
}
`
tmpl = strings.Replace(tmpl, "{LEAF_PORT}", fmt.Sprintf("%d", ob.LeafNode.Port), 1)
c := createJetStreamCluster(t, tmpl, "cluster-a", "cluster-a-", 3, 22110, false)
defer c.shutdown()

// Make sure all servers are connected via leafnode to the hub, the b server.
for _, s := range c.servers {
checkLeafNodeConnectedCount(t, s, 1)
}

// Connect to a server in the cluster and create a DQ listener.
nc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("a", "a"))
defer nc.Close()

var got atomic.Int32

natsQueueSub(t, nc, "PING", "Q", func(m *nats.Msg) {
got.Add(1)
m.Respond([]byte("REPLY"))
})

// Now connect to B and send the request.
ncb, _ := jsClientConnect(t, sb, nats.UserInfo("agg", "agg"))
defer ncb.Close()

_, err := ncb.Request("A.PING", []byte("REQUEST"), time.Second)
require_NoError(t, err)
require_Equal(t, got.Load(), 1)
}