/
redis-emitter.js
135 lines (113 loc) · 3.88 KB
/
redis-emitter.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
const redis = require('redis')
const { EventEmitter } = require('node:events')
/**
* This module simulates the builtin events.EventEmitter but with the use of redis.
* This is useful for when companion is running on multiple instances and events need
* to be distributed across.
*/
module.exports = (redisUrl, redisPubSubScope) => {
const prefix = redisPubSubScope ? `${redisPubSubScope}:` : ''
const getPrefixedEventName = (eventName) => `${prefix}${eventName}`
const publisher = redis.createClient({ url: redisUrl })
let subscriber
const connectedPromise = publisher.connect().then(() => {
subscriber = publisher.duplicate()
return subscriber.connect()
})
const handlersByEvent = new Map()
const errorEmitter = new EventEmitter()
const handleError = (err) => errorEmitter.emit('error', err)
connectedPromise.catch((err) => handleError(err))
async function runWhenConnected (fn) {
try {
await connectedPromise
await fn()
} catch (err) {
handleError(err)
}
}
function addListener (eventName, handler, _once = false) {
function actualHandler (message) {
if (_once) removeListener(eventName, handler)
let args
try {
args = JSON.parse(message)
} catch (ex) {
return handleError(new Error(`Invalid JSON received! Channel: ${eventName} Message: ${message}`))
}
return handler(...args)
}
let handlersByThisEventName = handlersByEvent.get(eventName)
if (handlersByThisEventName == null) {
handlersByThisEventName = new WeakMap()
handlersByEvent.set(eventName, handlersByThisEventName)
}
handlersByThisEventName.set(handler, actualHandler)
runWhenConnected(() => subscriber.pSubscribe(getPrefixedEventName(eventName), actualHandler))
}
/**
* Add an event listener
*
* @param {string} eventName name of the event
* @param {any} handler the handler of the event
*/
function on (eventName, handler) {
if (eventName === 'error') return errorEmitter.on('error', handler)
return addListener(eventName, handler)
}
/**
* Add an event listener (will be triggered at most once)
*
* @param {string} eventName name of the event
* @param {any} handler the handler of the event
*/
function once (eventName, handler) {
if (eventName === 'error') return errorEmitter.once('error', handler)
return addListener(eventName, handler, true)
}
/**
* Announce the occurence of an event
*
* @param {string} eventName name of the event
*/
function emit (eventName, ...args) {
runWhenConnected(() => publisher.publish(getPrefixedEventName(eventName), JSON.stringify(args)))
}
/**
* Remove an event listener
*
* @param {string} eventName name of the event
* @param {any} handler the handler of the event to remove
*/
function removeListener (eventName, handler) {
if (eventName === 'error') return errorEmitter.removeListener('error', handler)
return runWhenConnected(() => {
const handlersByThisEventName = handlersByEvent.get(eventName)
if (handlersByThisEventName == null) return undefined
const actualHandler = handlersByThisEventName.get(handler)
if (actualHandler == null) return undefined
handlersByThisEventName.delete(handler)
if (handlersByThisEventName.size === 0) handlersByEvent.delete(eventName)
return subscriber.pUnsubscribe(getPrefixedEventName(eventName), actualHandler)
})
}
/**
* Remove all listeners of an event
*
* @param {string} eventName name of the event
*/
function removeAllListeners (eventName) {
if (eventName === 'error') return errorEmitter.removeAllListeners(eventName)
return runWhenConnected(() => {
handlersByEvent.delete(eventName)
return subscriber.pUnsubscribe(getPrefixedEventName(eventName))
})
}
return {
on,
once,
emit,
removeListener,
removeAllListeners,
}
}