Skip to content

Commit

Permalink
feat: add two subscription modes for the sharded adapter
Browse files Browse the repository at this point in the history
The subscriptionMode option allows to configure how many Redis Pub/Sub
channels are used:

- "static": 2 channels per namespace

Useful when used with dynamic namespaces.

- "dynamic": (2 + 1 per public room) channels per namespace

The default value, useful when some rooms have a low number of clients
(so only a few Socket.IO servers are notified).

Related:

- #491
- #492
- #493
  • Loading branch information
darrachequesne committed May 2, 2023
1 parent ef51d69 commit d3388bf
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 29 deletions.
102 changes: 90 additions & 12 deletions lib/sharded-adapter.ts
Expand Up @@ -8,9 +8,42 @@ const debug = debugModule("socket.io-redis");
const RETURN_BUFFERS = true;

export interface ShardedRedisAdapterOptions {
/**
* The prefix for the Redis Pub/Sub channels.
*
* @default "socket.io"
*/
channelPrefix?: string;
/**
* The subscription mode impacts the number of Redis Pub/Sub channels:
*
* - "static": 2 channels per namespace
*
* Useful when used with dynamic namespaces.
*
* - "dynamic": (2 + 1 per public room) channels per namespace
*
* The default value, useful when some rooms have a low number of clients (so only a few Socket.IO servers are notified).
*
* Only public rooms (i.e. not related to a particular Socket ID) are taken in account, because:
*
* - a lot of connected clients would mean a lot of subscription/unsubscription
* - the Socket ID attribute is ephemeral
*
* @default "dynamic"
*/
subscriptionMode?: "static" | "dynamic";
}

/**
* Create a new Adapter based on Redis sharded Pub/Sub introduced in Redis 7.0.
*
* @see https://redis.io/docs/manual/pubsub/#sharded-pubsub
*
* @param pubClient - the Redis client used to publish (from the `redis` package)
* @param subClient - the Redis client used to subscribe (from the `redis` package)
* @param opts - some additional options
*/
export function createShardedAdapter(
pubClient: any,
subClient: any,
Expand All @@ -36,6 +69,7 @@ class ShardedRedisAdapter extends ClusterAdapter {
this.opts = Object.assign(
{
channelPrefix: "socket.io",
subscriptionMode: "dynamic",
},
opts
);
Expand All @@ -48,25 +82,69 @@ class ShardedRedisAdapter extends ClusterAdapter {
this.subClient.sSubscribe(this.channel, handler, RETURN_BUFFERS);
this.subClient.sSubscribe(this.responseChannel, handler, RETURN_BUFFERS);

this.cleanup = () => {
return Promise.all([
this.subClient.sUnsubscribe(this.channel, handler),
this.subClient.sUnsubscribe(this.responseChannel, handler),
]);
};
if (this.opts.subscriptionMode === "dynamic") {
this.on("create-room", (room) => {
const isPublicRoom = !this.sids.has(room);
if (isPublicRoom) {
this.subClient.sSubscribe(
this.dynamicChannel(room),
handler,
RETURN_BUFFERS
);
}
});

this.on("delete-room", (room) => {
const isPublicRoom = !this.sids.has(room);
if (isPublicRoom) {
this.subClient.sUnsubscribe(this.dynamicChannel(room));
}
});
}
}

override close(): Promise<void> | void {
this.cleanup();
const channels = [this.channel, this.responseChannel];

if (this.opts.subscriptionMode === "dynamic") {
this.rooms.forEach((_sids, room) => {
const isPublicRoom = !this.sids.has(room);
if (isPublicRoom) {
channels.push(this.dynamicChannel(room));
}
});
}

return this.subClient.sUnsubscribe(channels);
}

override publishMessage(message) {
debug("publishing message of type %s to %s", message.type, this.channel);
this.pubClient.sPublish(this.channel, this.encode(message));
const channel = this.computeChannel(message);
debug("publishing message of type %s to %s", message.type, channel);
this.pubClient.sPublish(channel, this.encode(message));

return Promise.resolve("");
}

private computeChannel(message) {
// broadcast with ack can not use a dynamic channel, because the serverCount() method return the number of all
// servers, not only the ones where the given room exists
const useDynamicChannel =
this.opts.subscriptionMode === "dynamic" &&
message.type === MessageType.BROADCAST &&
message.data.requestId === undefined &&
message.data.opts.rooms.length === 1;
if (useDynamicChannel) {
return this.dynamicChannel(message.data.opts.rooms[0]);
} else {
return this.channel;
}
}

private dynamicChannel(room) {
return this.channel + room + "#";
}

override publishResponse(requesterUid, response) {
debug("publishing response of type %s to %s", response.type, requesterUid);

Expand Down Expand Up @@ -104,10 +182,10 @@ class ShardedRedisAdapter extends ClusterAdapter {
return debug("invalid format: %s", e.message);
}

if (channel.toString() === this.channel) {
this.onMessage(message, "");
} else {
if (channel.toString() === this.responseChannel) {
this.onResponse(message);
} else {
this.onMessage(message);
}
}

Expand Down
30 changes: 15 additions & 15 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Expand Up @@ -40,7 +40,7 @@
"mocha": "^10.1.0",
"nyc": "^15.1.0",
"prettier": "^2.8.7",
"redis": "^4.6.5",
"redis": "^4.6.6",
"redis-v3": "npm:redis@^3.1.2",
"socket.io": "^4.6.1",
"socket.io-client": "^4.1.1",
Expand Down
5 changes: 4 additions & 1 deletion test/specifics.ts
Expand Up @@ -23,7 +23,10 @@ describe("specifics", () => {
afterEach(() => cleanup());

describe("broadcast", function () {
it("broadcasts to a numeric room", (done) => {
it("broadcasts to a numeric room", function (done) {
if (process.env.SHARDED === "1") {
return this.skip();
}
// @ts-ignore
serverSockets[0].join(123);

Expand Down

0 comments on commit d3388bf

Please sign in to comment.