Skip to content

Commit

Permalink
add push message handler registration and make all pubsub use it.
Browse files Browse the repository at this point in the history
  • Loading branch information
sjpotter committed Apr 4, 2024
1 parent 657167c commit e8c0988
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 42 deletions.
65 changes: 49 additions & 16 deletions packages/client/lib/client/commands-queue.ts
Expand Up @@ -2,7 +2,7 @@ import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList } from './linked-l
import encodeCommand from '../RESP/encoder';
import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder';
import { TypeMapping, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types';
import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub';
import { COMMANDS, ChannelListeners, PUBSUB_TYPE, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub';
import { AbortError, ErrorReply } from '../errors';
import { MonitorCallback } from '.';

Expand Down Expand Up @@ -51,6 +51,8 @@ export default class RedisCommandsQueue {
#chainInExecution: symbol | undefined;
readonly decoder;
readonly #pubSub = new PubSub();
readonly #pushHandlers: Map<string, (pushMsg: Array<any>) => unknown> = new Map();
readonly #builtInSet = new Set<string>;

get isPubSubActive() {
return this.#pubSub.isActive;
Expand All @@ -64,6 +66,21 @@ export default class RedisCommandsQueue {
this.#respVersion = respVersion;
this.#maxLength = maxLength;
this.#onShardedChannelMoved = onShardedChannelMoved;

this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.CHANNELS].message.toString(), this.#pubSub.handleMessageReplyChannel.bind(this.#pubSub));
this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.CHANNELS].subscribe.toString(), this.#handleStatusReply.bind(this));
this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.CHANNELS].unsubscribe.toString(), this.#handleStatusReply.bind(this));
this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.PATTERNS].message.toString(), this.#pubSub.handleMessageReplyPattern.bind(this.#pubSub));
this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.PATTERNS].subscribe.toString(), this.#handleStatusReply.bind(this));
this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.PATTERNS].unsubscribe.toString(), this.#handleStatusReply.bind(this));
this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.SHARDED].message.toString(), this.#pubSub.handleMessageReplySharded.bind(this.#pubSub));
this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.SHARDED].subscribe.toString(), this.#handleStatusReply.bind(this));
this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.SHARDED].unsubscribe.toString(), this.#handleShardedUnsubscribe.bind(this));

for (const str in this.#pushHandlers.keys) {
this.#builtInSet.add(str);
}

this.decoder = this.#initiateDecoder();
}

Expand All @@ -75,28 +92,44 @@ export default class RedisCommandsQueue {
this.#waitingForReply.shift()!.reject(err);
}

#onPush(push: Array<any>) {
// TODO: type
if (this.#pubSub.handleMessageReply(push)) return true;

const isShardedUnsubscribe = PubSub.isShardedUnsubscribe(push);
if (isShardedUnsubscribe && !this.#waitingForReply.length) {
#handleStatusReply(push: Array<any>) {
const head = this.#waitingForReply.head!.value;
if (
(Number.isNaN(head.channelsCounter!) && push[2] === 0) ||
--head.channelsCounter! === 0
) {
this.#waitingForReply.shift()!.resolve();
}
}

#handleShardedUnsubscribe(push: Array<any>) {
if (!this.#waitingForReply.length) {
const channel = push[1].toString();
this.#onShardedChannelMoved(
channel,
this.#pubSub.removeShardedListeners(channel)
);
return true;
} else if (isShardedUnsubscribe || PubSub.isStatusReply(push)) {
const head = this.#waitingForReply.head!.value;
if (
(Number.isNaN(head.channelsCounter!) && push[2] === 0) ||
--head.channelsCounter! === 0
) {
this.#waitingForReply.shift()!.resolve();
}
} else {
this.#handleStatusReply(push);
}
}

addPushHandler(messageType: string, handler: (pushMsg: Array<any>) => unknown) {
if (this.#builtInSet.has(messageType)) {
throw new Error("Cannot override built in push message handler");
}

this.#pushHandlers.set(messageType, handler);
}

#onPush(push: Array<any>) {
const handler = this.#pushHandlers.get(push[0].toString());
if (handler) {
handler(push);
return true;
}

return false;
}

#getTypeMapping() {
Expand Down
49 changes: 23 additions & 26 deletions packages/client/lib/client/pub-sub.ts
Expand Up @@ -11,7 +11,7 @@ export type PUBSUB_TYPE = typeof PUBSUB_TYPE;

export type PubSubType = PUBSUB_TYPE[keyof PUBSUB_TYPE];

const COMMANDS = {
export const COMMANDS = {
[PUBSUB_TYPE.CHANNELS]: {
subscribe: Buffer.from('subscribe'),
unsubscribe: Buffer.from('unsubscribe'),
Expand Down Expand Up @@ -344,32 +344,29 @@ export class PubSub {
return commands;
}

handleMessageReply(reply: Array<Buffer>): boolean {
if (COMMANDS[PUBSUB_TYPE.CHANNELS].message.equals(reply[0])) {
this.#emitPubSubMessage(
PUBSUB_TYPE.CHANNELS,
reply[2],
reply[1]
);
return true;
} else if (COMMANDS[PUBSUB_TYPE.PATTERNS].message.equals(reply[0])) {
this.#emitPubSubMessage(
PUBSUB_TYPE.PATTERNS,
reply[3],
reply[2],
reply[1]
);
return true;
} else if (COMMANDS[PUBSUB_TYPE.SHARDED].message.equals(reply[0])) {
this.#emitPubSubMessage(
PUBSUB_TYPE.SHARDED,
reply[2],
reply[1]
);
return true;
}
handleMessageReplyChannel(push: Array<Buffer>) {
this.#emitPubSubMessage(
PUBSUB_TYPE.CHANNELS,
push[2],
push[1]
);
}

return false;
handleMessageReplyPattern(push: Array<Buffer>) {
this.#emitPubSubMessage(
PUBSUB_TYPE.PATTERNS,
push[3],
push[2],
push[1]
);
}

handleMessageReplySharded(push: Array<Buffer>) {
this.#emitPubSubMessage(
PUBSUB_TYPE.SHARDED,
push[2],
push[1]
);
}

removeShardedListeners(channel: string): ChannelListeners {
Expand Down

0 comments on commit e8c0988

Please sign in to comment.