Skip to content

Commit

Permalink
[FIXED] Make sure to not forward a message across a route for routed …
Browse files Browse the repository at this point in the history
…dq subs (#4578)

Mimic same behavior for normal subs.

Note that when a queue subscription is behind both a spoke leafnode
connection and a service import the message will be delivered over the
leafnode since service imports are binary signals that are just on. Need
a more thorough investigation for a proper fix. For now its best to not
have the service import on the spoke leafnode such that the raw queue
sub's information if relayed across the leafnode.

Signed-off-by: Derek Collison <derek@nats.io>

Resolves #4367
  • Loading branch information
derekcollison committed Sep 24, 2023
2 parents 637d8f2 + 13dcf31 commit fe2c116
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 0 deletions.
6 changes: 6 additions & 0 deletions server/client.go
Expand Up @@ -4558,6 +4558,12 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
continue
}

// If we are a spoke leaf node make sure to not forward across routes.
// This mimics same behavior for normal subs above.
if c.kind == LEAF && c.isSpokeLeafNode() && sub.client.kind == ROUTER {
continue
}

// We have taken care of preferring local subs for a message from a route above.
// Here we just care about a client or leaf and skipping a leaf and preferring locals.
if dst := sub.client.kind; dst == ROUTER || dst == LEAF {
Expand Down
74 changes: 74 additions & 0 deletions server/leafnode_test.go
Expand Up @@ -7126,5 +7126,79 @@ func TestLeafNodeSlowConsumer(t *testing.T) {
time.Sleep(1 * time.Millisecond)
}
t.Fatalf("Timed out waiting for slow consumer leafnodes, got: %v, expected: %v", got, expected)
}

// https://github.com/nats-io/nats-server/issues/4367
func TestLeafNodeDQMultiAccountExportImport(t *testing.T) {
bConf := createConfFile(t, []byte(`
listen: 127.0.0.1:-1
server_name: cluster-b-0
accounts {
$SYS: { users: [ { user: admin, password: pwd } ] },
AGG: {
exports: [ { service: "PING.>" } ]
users: [ { user: agg, password: agg } ]
}
}
leaf { listen: 127.0.0.1:-1 }
`))

sb, ob := RunServerWithConfig(bConf)
defer sb.Shutdown()

tmpl := `
listen: 127.0.0.1:-1
server_name: %s
jetstream: { store_dir: '%s' }
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
accounts {
$SYS: { users: [ { user: admin, password: pwd } ] },
A: {
mappings: { "A.>" : ">" }
exports: [ { service: A.> } ]
users: [ { user: a, password: a } ]
},
AGG: {
imports: [ { service: { subject: A.>, account: A } } ]
users: [ { user: agg, password: agg } ]
},
}
leaf {
remotes: [ {
urls: [ nats-leaf://agg:agg@127.0.0.1:{LEAF_PORT} ]
account: AGG
} ]
}
`
tmpl = strings.Replace(tmpl, "{LEAF_PORT}", fmt.Sprintf("%d", ob.LeafNode.Port), 1)
c := createJetStreamCluster(t, tmpl, "cluster-a", "cluster-a-", 3, 22110, false)
defer c.shutdown()

// Make sure all servers are connected via leafnode to the hub, the b server.
for _, s := range c.servers {
checkLeafNodeConnectedCount(t, s, 1)
}

// Connect to a server in the cluster and create a DQ listener.
nc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("a", "a"))
defer nc.Close()

var got atomic.Int32

natsQueueSub(t, nc, "PING", "Q", func(m *nats.Msg) {
got.Add(1)
m.Respond([]byte("REPLY"))
})

// Now connect to B and send the request.
ncb, _ := jsClientConnect(t, sb, nats.UserInfo("agg", "agg"))
defer ncb.Close()

_, err := ncb.Request("A.PING", []byte("REQUEST"), time.Second)
require_NoError(t, err)
require_Equal(t, got.Load(), 1)
}

0 comments on commit fe2c116

Please sign in to comment.