Skip to content

Commit

Permalink
fix(sharded): ensure compatibility with ioredis
Browse files Browse the repository at this point in the history
Related: #499
  • Loading branch information
darrachequesne committed May 13, 2023
1 parent 55ce829 commit 42c8ab6
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 88 deletions.
40 changes: 10 additions & 30 deletions lib/sharded-adapter.ts
@@ -1,12 +1,10 @@
import { ClusterAdapter, ClusterMessage, MessageType } from "./cluster-adapter";
import { decode, encode } from "notepack.io";
import { hasBinary, parseNumSubResponse, sumValues } from "./util";
import { hasBinary, PUBSUB, SPUBLISH, SSUBSCRIBE, SUNSUBSCRIBE } from "./util";
import debugModule from "debug";

const debug = debugModule("socket.io-redis");

const RETURN_BUFFERS = true;

export interface ShardedRedisAdapterOptions {
/**
* The prefix for the Redis Pub/Sub channels.
Expand Down Expand Up @@ -78,25 +76,21 @@ class ShardedRedisAdapter extends ClusterAdapter {

const handler = (message, channel) => this.onRawMessage(message, channel);

this.subClient.sSubscribe(this.channel, handler, RETURN_BUFFERS);
this.subClient.sSubscribe(this.responseChannel, handler, RETURN_BUFFERS);
SSUBSCRIBE(this.subClient, this.channel, handler);
SSUBSCRIBE(this.subClient, this.responseChannel, handler);

if (this.opts.subscriptionMode === "dynamic") {
this.on("create-room", (room) => {
const isPublicRoom = !this.sids.has(room);
if (isPublicRoom) {
this.subClient.sSubscribe(
this.dynamicChannel(room),
handler,
RETURN_BUFFERS
);
SSUBSCRIBE(this.subClient, this.dynamicChannel(room), handler);
}
});

this.on("delete-room", (room) => {
const isPublicRoom = !this.sids.has(room);
if (isPublicRoom) {
this.subClient.sUnsubscribe(this.dynamicChannel(room));
SUNSUBSCRIBE(this.subClient, this.dynamicChannel(room));
}
});
}
Expand All @@ -114,13 +108,13 @@ class ShardedRedisAdapter extends ClusterAdapter {
});
}

return this.subClient.sUnsubscribe(channels);
return SUNSUBSCRIBE(this.subClient, channels);
}

override publishMessage(message) {
const channel = this.computeChannel(message);
debug("publishing message of type %s to %s", message.type, channel);
this.pubClient.sPublish(channel, this.encode(message));
SPUBLISH(this.pubClient, channel, this.encode(message));

return Promise.resolve("");
}
Expand All @@ -147,7 +141,8 @@ class ShardedRedisAdapter extends ClusterAdapter {
override publishResponse(requesterUid, response) {
debug("publishing response of type %s to %s", response.type, requesterUid);

this.pubClient.sPublish(
SPUBLISH(
this.pubClient,
`${this.channel}${requesterUid}#`,
this.encode(response)
);
Expand Down Expand Up @@ -189,21 +184,6 @@ class ShardedRedisAdapter extends ClusterAdapter {
}

override serverCount(): Promise<number> {
if (
this.pubClient.constructor.name === "Cluster" ||
this.pubClient.isCluster
) {
return Promise.all(
this.pubClient.nodes().map((node) => {
return node
.sendCommand(["PUBSUB", "SHARDNUMSUB", this.channel])
.then(parseNumSubResponse);
})
).then(sumValues);
} else {
return this.pubClient
.sendCommand(["PUBSUB", "SHARDNUMSUB", this.channel])
.then(parseNumSubResponse);
}
return PUBSUB(this.pubClient, "SHARDNUMSUB", this.channel);
}
}
86 changes: 86 additions & 0 deletions lib/util.ts
Expand Up @@ -44,3 +44,89 @@ export function sumValues(values) {
return acc + val;
}, 0);
}

const RETURN_BUFFERS = true;

/**
* Whether the client comes from the `redis` package
*
* @param redisClient
*
* @see https://github.com/redis/node-redis
*/
function isRedisV4Client(redisClient: any) {
return typeof redisClient.sSubscribe === "function";
}

export function SSUBSCRIBE(
redisClient: any,
channel: string,
handler: (rawMessage: Buffer, channel: Buffer) => void
) {
if (isRedisV4Client(redisClient)) {
redisClient.sSubscribe(channel, handler, RETURN_BUFFERS);
} else {
redisClient.ssubscribe(channel);

redisClient.on("smessageBuffer", (rawChannel, message) => {
if (rawChannel.toString() === channel) {
handler(message, rawChannel);
}
});
}
}

export function SUNSUBSCRIBE(redisClient: any, channel: string | string[]) {
if (isRedisV4Client(redisClient)) {
redisClient.sUnsubscribe(channel);
} else {
redisClient.sunsubscribe(channel);
}
}

export function SPUBLISH(
redisClient: any,
channel: string,
payload: string | Uint8Array
) {
if (isRedisV4Client(redisClient)) {
redisClient.sPublish(channel, payload);
} else {
redisClient.spublish(channel, payload);
}
}

export function PUBSUB(redisClient: any, arg: string, channel: string) {
if (redisClient.constructor.name === "Cluster" || redisClient.isCluster) {
return Promise.all(
redisClient.nodes().map((node) => {
return node
.sendCommand(["PUBSUB", arg, channel])
.then(parseNumSubResponse);
})
).then(sumValues);
} else if (isRedisV4Client(redisClient)) {
const isCluster = Array.isArray(redisClient.masters);
if (isCluster) {
const nodes = redisClient.masters;
return Promise.all(
nodes.map((node) => {
return node.client
.sendCommand(["PUBSUB", arg, channel])
.then(parseNumSubResponse);
})
).then(sumValues);
} else {
return redisClient
.sendCommand(["PUBSUB", arg, channel])
.then(parseNumSubResponse);
}
} else {
return new Promise((resolve, reject) => {
redisClient.send_command("PUBSUB", [arg, channel], (err, numSub) => {
if (err) return reject(err);
resolve(parseNumSubResponse(numSub));
});
});
}
}

0 comments on commit 42c8ab6

Please sign in to comment.