-
-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
RPCRedis.ts
130 lines (117 loc) · 3.91 KB
/
RPCRedis.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import type { Buffer } from 'node:buffer';
import { clearTimeout, setTimeout } from 'node:timers';
import type { IRPCBroker } from '../Broker.js';
import { DefaultBrokerOptions } from '../Broker.js';
import type { RedisBrokerOptions } from './BaseRedis.js';
import { BaseRedisBroker } from './BaseRedis.js';
interface InternalPromise {
reject(error: any): void;
resolve(data: any): void;
timeout: NodeJS.Timeout;
}
/**
* Options specific for an RPC Redis broker
*/
export interface RPCRedisBrokerOptions extends RedisBrokerOptions {
timeout?: number;
}
/**
* Default values used for the {@link RPCRedisBrokerOptions}
*/
export const DefaultRPCRedisBrokerOptions = {
...DefaultBrokerOptions,
timeout: 5_000,
} as const satisfies Required<Omit<RPCRedisBrokerOptions, 'redisClient'>>;
/**
* RPC broker powered by Redis
*
* @example
* ```ts
* // caller.js
* import { RPCRedisBroker } from '@discordjs/brokers';
* import Redis from 'ioredis';
*
* const broker = new RPCRedisBroker({ redisClient: new Redis() });
*
* console.log(await broker.call('testcall', 'Hello World!'));
* await broker.destroy();
*
* // responder.js
* import { RPCRedisBroker } from '@discordjs/brokers';
* import Redis from 'ioredis';
*
* const broker = new RPCRedisBroker({ redisClient: new Redis() });
* broker.on('testcall', ({ data, ack, reply }) => {
* console.log('responder', data);
* void ack();
* void reply(`Echo: ${data}`);
* });
*
* await broker.subscribe('responders', ['testcall']);
* ```
*/
export class RPCRedisBroker<TEvents extends Record<string, any>, TResponses extends Record<keyof TEvents, any>>
extends BaseRedisBroker<TEvents>
implements IRPCBroker<TEvents, TResponses>
{
/**
* Options this broker is using
*/
protected override readonly options: Required<RPCRedisBrokerOptions>;
protected readonly promises = new Map<string, InternalPromise>();
public constructor(options: RPCRedisBrokerOptions) {
super(options);
this.options = { ...DefaultRPCRedisBrokerOptions, ...options };
this.streamReadClient.on('messageBuffer', (channel: Buffer, message: Buffer) => {
const [, id] = channel.toString().split(':');
if (id && this.promises.has(id)) {
// eslint-disable-next-line @typescript-eslint/unbound-method
const { resolve, timeout } = this.promises.get(id)!;
resolve(this.options.decode(message));
clearTimeout(timeout);
}
});
}
/**
* {@inheritDoc IRPCBroker.call}
*/
public async call<T extends keyof TEvents>(
event: T,
data: TEvents[T],
timeoutDuration: number = this.options.timeout,
): Promise<TResponses[T]> {
const id = await this.options.redisClient.xadd(
event as string,
'*',
BaseRedisBroker.STREAM_DATA_KEY,
this.options.encode(data),
);
// This id! assertion is valid. From redis docs:
// "The command returns a Null reply when used with the NOMKSTREAM option and the key doesn't exist."
// See: https://redis.io/commands/xadd/
const rpcChannel = `${event as string}:${id!}`;
// Construct the error here for better stack traces
const timedOut = new Error(`timed out after ${timeoutDuration}ms`);
await this.streamReadClient.subscribe(rpcChannel);
return new Promise<TResponses[T]>((resolve, reject) => {
const timeout = setTimeout(() => reject(timedOut), timeoutDuration).unref();
this.promises.set(id!, { resolve, reject, timeout });
// eslint-disable-next-line promise/prefer-await-to-then
}).finally(() => {
void this.streamReadClient.unsubscribe(rpcChannel);
this.promises.delete(id!);
});
}
protected emitEvent(id: Buffer, group: string, event: string, data: unknown) {
const payload: { ack(): Promise<void>; data: unknown; reply(data: unknown): Promise<void> } = {
data,
ack: async () => {
await this.options.redisClient.xack(event, group, id);
},
reply: async (data) => {
await this.options.redisClient.publish(`${event}:${id.toString()}`, this.options.encode(data));
},
};
this.emit(event, payload);
}
}