From a81fc36c14a8ff85a5e43beb2f12a6f44cb6f34e Mon Sep 17 00:00:00 2001 From: Antoine du Hamel Date: Mon, 25 Jul 2022 15:14:13 +0200 Subject: [PATCH] @uppy/companion: upgrade `redis` to version 4.x (#3589) Co-authored-by: Mikael Finstad --- package.json | 7 +- src/server/Uploader.js | 2 +- src/server/emitter/redis-emitter.js | 103 +++++++++++++++++++++++----- src/server/redis.js | 18 ++++- src/server/socket.js | 7 +- 5 files changed, 108 insertions(+), 29 deletions(-) diff --git a/package.json b/package.json index fdd9062dd7..2e6d991651 100644 --- a/package.json +++ b/package.json @@ -34,7 +34,7 @@ "body-parser": "1.19.0", "chalk": "2.4.2", "common-tags": "1.8.0", - "connect-redis": "4.0.3", + "connect-redis": "6.1.3", "cookie-parser": "1.4.6", "cors": "^2.8.5", "escape-goat": "3.0.0", @@ -55,11 +55,10 @@ "moment-timezone": "^0.5.31", "morgan": "1.10.0", "ms": "2.1.2", - "node-redis-pubsub": "^5.0.0", "node-schedule": "1.3.2", "prom-client": "12.0.0", "purest": "3.1.0", - "redis": "3.1.1", + "redis": "4.2.0", "request": "2.88.2", "semver": "6.3.0", "serialize-error": "^2.1.0", @@ -70,7 +69,7 @@ }, "devDependencies": { "@types/compression": "1.7.0", - "@types/connect-redis": "0.0.17", + "@types/connect-redis": "0.0.18", "@types/cookie-parser": "1.4.2", "@types/cors": "2.8.6", "@types/eslint": "^8.2.0", diff --git a/src/server/Uploader.js b/src/server/Uploader.js index f661770893..8ddc8fc4d4 100644 --- a/src/server/Uploader.js +++ b/src/server/Uploader.js @@ -340,7 +340,7 @@ class Uploader { size, companionOptions: req.companion.options, pathPrefix: `${req.companion.options.filePath}`, - storage: redis.client(), + storage: redis.client()?.v4, s3: req.companion.s3Client ? { client: req.companion.s3Client, options: req.companion.options.s3, diff --git a/src/server/emitter/redis-emitter.js b/src/server/emitter/redis-emitter.js index 58e02a322c..3336b0699b 100644 --- a/src/server/emitter/redis-emitter.js +++ b/src/server/emitter/redis-emitter.js @@ -1,4 +1,5 @@ -const NRP = require('node-redis-pubsub') +const redis = require('redis') +const { EventEmitter } = require('node:events') /** * This module simulates the builtin events.EventEmitter but with the use of redis. @@ -6,44 +7,108 @@ const NRP = require('node-redis-pubsub') * to be distributed across. */ module.exports = (redisUrl, redisPubSubScope) => { - const nrp = new NRP({ url: redisUrl, scope: redisPubSubScope }) + const prefix = redisPubSubScope ? `${redisPubSubScope}:` : '' + const getPrefixedEventName = (eventName) => `${prefix}${eventName}` + const publisher = redis.createClient({ url: redisUrl }) + let subscriber + const connectedPromise = publisher.connect().then(() => { + subscriber = publisher.duplicate() + return subscriber.connect() + }) + + const handlersByEvent = new Map() + + const errorEmitter = new EventEmitter() + const handleError = (err) => errorEmitter.emit('error', err) + + connectedPromise.catch((err) => handleError(err)) + + async function runWhenConnected (fn) { + try { + await connectedPromise + await fn() + } catch (err) { + handleError(err) + } + } + + function addListener (eventName, handler, _once = false) { + function actualHandler (message) { + if (_once) removeListener(eventName, handler) + let args + try { + args = JSON.parse(message) + } catch (ex) { + return handleError(new Error(`Invalid JSON received! Channel: ${eventName} Message: ${message}`)) + } + return handler(...args) + } + + let handlersByThisEventName = handlersByEvent.get(eventName) + if (handlersByThisEventName == null) { + handlersByThisEventName = new WeakMap() + handlersByEvent.set(eventName, handlersByThisEventName) + } + handlersByThisEventName.set(handler, actualHandler) + + runWhenConnected(() => subscriber.pSubscribe(getPrefixedEventName(eventName), actualHandler)) + } + + /** + * Add an event listener + * + * @param {string} eventName name of the event + * @param {any} handler the handler of the event + */ function on (eventName, handler) { - nrp.on(eventName, handler) + if (eventName === 'error') return errorEmitter.on('error', handler) + + return addListener(eventName, handler) } /** - * Add a one-off event listener + * Add an event listener (will be triggered at most once) * * @param {string} eventName name of the event - * @param {Function} handler the handler of the event + * @param {any} handler the handler of the event */ function once (eventName, handler) { - const off = nrp.on(eventName, (message) => { - handler(message) - off() - }) + if (eventName === 'error') return errorEmitter.once('error', handler) + + return addListener(eventName, handler, true) } /** * Announce the occurence of an event * * @param {string} eventName name of the event - * @param {object} message the message to pass along with the event */ - function emit (eventName, message) { - return nrp.emit(eventName, message || {}) + function emit (eventName, ...args) { + runWhenConnected(() => publisher.publish(getPrefixedEventName(eventName), JSON.stringify(args))) } /** * Remove an event listener * * @param {string} eventName name of the event - * @param {Function} handler the handler of the event to remove + * @param {any} handler the handler of the event to remove */ function removeListener (eventName, handler) { - nrp.receiver.removeListener(eventName, handler) - nrp.receiver.punsubscribe(`${nrp.prefix}${eventName}`) + if (eventName === 'error') return errorEmitter.removeListener('error', handler) + + return runWhenConnected(() => { + const handlersByThisEventName = handlersByEvent.get(eventName) + if (handlersByThisEventName == null) return undefined + + const actualHandler = handlersByThisEventName.get(handler) + if (actualHandler == null) return undefined + + handlersByThisEventName.delete(handler) + if (handlersByThisEventName.size === 0) handlersByEvent.delete(eventName) + + return subscriber.pUnsubscribe(getPrefixedEventName(eventName), actualHandler) + }) } /** @@ -52,8 +117,12 @@ module.exports = (redisUrl, redisPubSubScope) => { * @param {string} eventName name of the event */ function removeAllListeners (eventName) { - nrp.receiver.removeAllListeners(eventName) - nrp.receiver.punsubscribe(`${nrp.prefix}${eventName}`) + if (eventName === 'error') return errorEmitter.removeAllListeners(eventName) + + return runWhenConnected(() => { + handlersByEvent.delete(eventName) + return subscriber.pUnsubscribe(getPrefixedEventName(eventName)) + }) } return { diff --git a/src/server/redis.js b/src/server/redis.js index 4e2bf6c11d..d14dab7ecc 100644 --- a/src/server/redis.js +++ b/src/server/redis.js @@ -1,5 +1,6 @@ const redis = require('redis') -const merge = require('lodash.merge') + +const logger = require('./logger') let redisClient @@ -11,7 +12,18 @@ let redisClient */ function createClient (opts) { if (!redisClient) { - redisClient = redis.createClient(opts) + // todo remove legacyMode when fixed: https://github.com/tj/connect-redis/issues/361 + redisClient = redis.createClient({ ...opts, legacyMode: true }) + + ;(async () => { + try { + // fire and forget. + // any requests made on the client before connection is established will be auto-queued by node-redis + await redisClient.connect() + } catch (err) { + logger.error(err.message, 'redis.error') + } + })() } return redisClient @@ -22,5 +34,5 @@ module.exports.client = (companionOptions) => { return redisClient } - return createClient(merge({ url: companionOptions.redisUrl }, companionOptions.redisOptions)) + return createClient({ ...companionOptions.redisOptions, url: companionOptions.redisUrl }) } diff --git a/src/server/socket.js b/src/server/socket.js index d9654923ba..21cd9de28c 100644 --- a/src/server/socket.js +++ b/src/server/socket.js @@ -12,7 +12,7 @@ const { STORAGE_PREFIX, shortenToken } = require('./Uploader') */ module.exports = (server) => { const wss = new SocketServer({ server }) - const redisClient = redis.client() + const redisClient = redis.client()?.v4 // A new connection is usually created when an upload begins, // or when connection fails while an upload is on-going and, @@ -37,13 +37,12 @@ module.exports = (server) => { // if the redisClient is available, then we attempt to check the storage // if we have any already stored progress data on the upload. if (redisClient) { - redisClient.get(`${STORAGE_PREFIX}:${token}`, (err, data) => { - if (err) logger.error(err, 'socket.redis.error', shortenToken(token)) + redisClient.get(`${STORAGE_PREFIX}:${token}`).then((data) => { if (data) { const dataObj = JSON.parse(data.toString()) if (dataObj.action) sendProgress(dataObj) } - }) + }).catch((err) => logger.error(err, 'socket.redis.error', shortenToken(token))) } emitter().emit(`connection:${token}`)