Skip to content

Commit

Permalink
@uppy/companion: upgrade redis to version 4.x (#3589)
Browse files Browse the repository at this point in the history
Co-authored-by: Mikael Finstad <finstaden@gmail.com>
  • Loading branch information
aduh95 and mifi committed Jul 25, 2022
1 parent c28ec8d commit aee4d3f
Show file tree
Hide file tree
Showing 9 changed files with 337 additions and 160 deletions.
13 changes: 13 additions & 0 deletions .yarn/patches/@types-connect-redis-npm-0.0.18-4fd2b614d3
@@ -0,0 +1,13 @@
diff --git a/index.d.ts b/index.d.ts
index 413b15edf95c12b1b176279c82b3a9c3f06ade20..0158f6f7a95935acbb67aeea00cb94c241475565 100755
--- a/index.d.ts
+++ b/index.d.ts
@@ -19,7 +19,7 @@ declare module 'connect-redis' {
function s(options: (options?: session.SessionOptions) => express.RequestHandler): s.RedisStore;

namespace s {
- type Client = redis.RedisClient | ioRedis.Redis | ioRedis.Cluster;
+ type Client = redis.RedisClientType | ioRedis.Redis | ioRedis.Cluster;
interface RedisStore extends session.Store {
new (options: RedisStoreOptions): RedisStore;
client: Client;
2 changes: 1 addition & 1 deletion examples/angular-example/package.json
Expand Up @@ -45,7 +45,7 @@
"@typescript-eslint/parser": "^5.0.0",
"eslint": "^8.0.0",
"eslint-plugin-import": "^2.22.1",
"eslint-plugin-jsdoc": "^37.0.0",
"eslint-plugin-jsdoc": "^38.0.0",
"eslint-plugin-prefer-arrow": "^1.2.3",
"jasmine-core": "~3.6.0",
"jasmine-spec-reporter": "~5.0.0",
Expand Down
8 changes: 4 additions & 4 deletions package.json
Expand Up @@ -77,13 +77,13 @@
"eslint-plugin-compat": "^4.0.0",
"eslint-plugin-cypress": "^2.12.1",
"eslint-plugin-import": "^2.25.2",
"eslint-plugin-jest": "^25.0.0",
"eslint-plugin-jsdoc": "^37.0.0",
"eslint-plugin-jest": "^26.0.0",
"eslint-plugin-jsdoc": "^38.0.0",
"eslint-plugin-jsx-a11y": "^6.4.1",
"eslint-plugin-markdown": "^2.2.0",
"eslint-plugin-node": "^11.1.0",
"eslint-plugin-prefer-import": "^0.0.1",
"eslint-plugin-promise": "^5.1.1",
"eslint-plugin-promise": "^6.0.0",
"eslint-plugin-react": "^7.22.0",
"eslint-plugin-react-hooks": "^4.2.0",
"eslint-plugin-unicorn": "^43.0.0",
Expand Down Expand Up @@ -200,7 +200,7 @@
]
},
"resolutions": {
"@types/redis": "2",
"@types/connect-redis@0.0.18": "patch:@types/connect-redis@npm:0.0.18#.yarn/patches/@types-connect-redis-npm-0.0.18-4fd2b614d3",
"@types/eslint@^7.2.13": "^8.2.0",
"@types/react": "^17",
"@types/webpack-dev-server": "^4",
Expand Down
7 changes: 3 additions & 4 deletions packages/@uppy/companion/package.json
Expand Up @@ -34,7 +34,7 @@
"body-parser": "1.19.0",
"chalk": "2.4.2",
"common-tags": "1.8.0",
"connect-redis": "4.0.3",
"connect-redis": "6.1.3",
"cookie-parser": "1.4.6",
"cors": "^2.8.5",
"escape-goat": "3.0.0",
Expand All @@ -55,11 +55,10 @@
"moment-timezone": "^0.5.31",
"morgan": "1.10.0",
"ms": "2.1.2",
"node-redis-pubsub": "^5.0.0",
"node-schedule": "1.3.2",
"prom-client": "12.0.0",
"purest": "3.1.0",
"redis": "3.1.1",
"redis": "4.2.0",
"request": "2.88.2",
"semver": "6.3.0",
"serialize-error": "^2.1.0",
Expand All @@ -70,7 +69,7 @@
},
"devDependencies": {
"@types/compression": "1.7.0",
"@types/connect-redis": "0.0.17",
"@types/connect-redis": "0.0.18",
"@types/cookie-parser": "1.4.2",
"@types/cors": "2.8.6",
"@types/eslint": "^8.2.0",
Expand Down
2 changes: 1 addition & 1 deletion packages/@uppy/companion/src/server/Uploader.js
Expand Up @@ -340,7 +340,7 @@ class Uploader {
size,
companionOptions: req.companion.options,
pathPrefix: `${req.companion.options.filePath}`,
storage: redis.client(),
storage: redis.client()?.v4,
s3: req.companion.s3Client ? {
client: req.companion.s3Client,
options: req.companion.options.s3,
Expand Down
103 changes: 86 additions & 17 deletions packages/@uppy/companion/src/server/emitter/redis-emitter.js
@@ -1,49 +1,114 @@
const NRP = require('node-redis-pubsub')
const redis = require('redis')
const { EventEmitter } = require('node:events')

/**
* This module simulates the builtin events.EventEmitter but with the use of redis.
* This is useful for when companion is running on multiple instances and events need
* to be distributed across.
*/
module.exports = (redisUrl, redisPubSubScope) => {
const nrp = new NRP({ url: redisUrl, scope: redisPubSubScope })
const prefix = redisPubSubScope ? `${redisPubSubScope}:` : ''
const getPrefixedEventName = (eventName) => `${prefix}${eventName}`
const publisher = redis.createClient({ url: redisUrl })
let subscriber

const connectedPromise = publisher.connect().then(() => {
subscriber = publisher.duplicate()
return subscriber.connect()
})

const handlersByEvent = new Map()

const errorEmitter = new EventEmitter()
const handleError = (err) => errorEmitter.emit('error', err)

connectedPromise.catch((err) => handleError(err))

async function runWhenConnected (fn) {
try {
await connectedPromise
await fn()
} catch (err) {
handleError(err)
}
}

function addListener (eventName, handler, _once = false) {
function actualHandler (message) {
if (_once) removeListener(eventName, handler)
let args
try {
args = JSON.parse(message)
} catch (ex) {
return handleError(new Error(`Invalid JSON received! Channel: ${eventName} Message: ${message}`))
}
return handler(...args)
}

let handlersByThisEventName = handlersByEvent.get(eventName)
if (handlersByThisEventName == null) {
handlersByThisEventName = new WeakMap()
handlersByEvent.set(eventName, handlersByThisEventName)
}
handlersByThisEventName.set(handler, actualHandler)

runWhenConnected(() => subscriber.pSubscribe(getPrefixedEventName(eventName), actualHandler))
}

/**
* Add an event listener
*
* @param {string} eventName name of the event
* @param {any} handler the handler of the event
*/
function on (eventName, handler) {
nrp.on(eventName, handler)
if (eventName === 'error') return errorEmitter.on('error', handler)

return addListener(eventName, handler)
}

/**
* Add a one-off event listener
* Add an event listener (will be triggered at most once)
*
* @param {string} eventName name of the event
* @param {Function} handler the handler of the event
* @param {any} handler the handler of the event
*/
function once (eventName, handler) {
const off = nrp.on(eventName, (message) => {
handler(message)
off()
})
if (eventName === 'error') return errorEmitter.once('error', handler)

return addListener(eventName, handler, true)
}

/**
* Announce the occurence of an event
*
* @param {string} eventName name of the event
* @param {object} message the message to pass along with the event
*/
function emit (eventName, message) {
return nrp.emit(eventName, message || {})
function emit (eventName, ...args) {
runWhenConnected(() => publisher.publish(getPrefixedEventName(eventName), JSON.stringify(args)))
}

/**
* Remove an event listener
*
* @param {string} eventName name of the event
* @param {Function} handler the handler of the event to remove
* @param {any} handler the handler of the event to remove
*/
function removeListener (eventName, handler) {
nrp.receiver.removeListener(eventName, handler)
nrp.receiver.punsubscribe(`${nrp.prefix}${eventName}`)
if (eventName === 'error') return errorEmitter.removeListener('error', handler)

return runWhenConnected(() => {
const handlersByThisEventName = handlersByEvent.get(eventName)
if (handlersByThisEventName == null) return undefined

const actualHandler = handlersByThisEventName.get(handler)
if (actualHandler == null) return undefined

handlersByThisEventName.delete(handler)
if (handlersByThisEventName.size === 0) handlersByEvent.delete(eventName)

return subscriber.pUnsubscribe(getPrefixedEventName(eventName), actualHandler)
})
}

/**
Expand All @@ -52,8 +117,12 @@ module.exports = (redisUrl, redisPubSubScope) => {
* @param {string} eventName name of the event
*/
function removeAllListeners (eventName) {
nrp.receiver.removeAllListeners(eventName)
nrp.receiver.punsubscribe(`${nrp.prefix}${eventName}`)
if (eventName === 'error') return errorEmitter.removeAllListeners(eventName)

return runWhenConnected(() => {
handlersByEvent.delete(eventName)
return subscriber.pUnsubscribe(getPrefixedEventName(eventName))
})
}

return {
Expand Down
18 changes: 15 additions & 3 deletions packages/@uppy/companion/src/server/redis.js
@@ -1,5 +1,6 @@
const redis = require('redis')
const merge = require('lodash.merge')

const logger = require('./logger')

let redisClient

Expand All @@ -11,7 +12,18 @@ let redisClient
*/
function createClient (opts) {
if (!redisClient) {
redisClient = redis.createClient(opts)
// todo remove legacyMode when fixed: https://github.com/tj/connect-redis/issues/361
redisClient = redis.createClient({ ...opts, legacyMode: true })

;(async () => {
try {
// fire and forget.
// any requests made on the client before connection is established will be auto-queued by node-redis
await redisClient.connect()
} catch (err) {
logger.error(err.message, 'redis.error')
}
})()
}

return redisClient
Expand All @@ -22,5 +34,5 @@ module.exports.client = (companionOptions) => {
return redisClient
}

return createClient(merge({ url: companionOptions.redisUrl }, companionOptions.redisOptions))
return createClient({ ...companionOptions.redisOptions, url: companionOptions.redisUrl })
}
7 changes: 3 additions & 4 deletions packages/@uppy/companion/src/server/socket.js
Expand Up @@ -12,7 +12,7 @@ const { STORAGE_PREFIX, shortenToken } = require('./Uploader')
*/
module.exports = (server) => {
const wss = new SocketServer({ server })
const redisClient = redis.client()
const redisClient = redis.client()?.v4

// A new connection is usually created when an upload begins,
// or when connection fails while an upload is on-going and,
Expand All @@ -37,13 +37,12 @@ module.exports = (server) => {
// if the redisClient is available, then we attempt to check the storage
// if we have any already stored progress data on the upload.
if (redisClient) {
redisClient.get(`${STORAGE_PREFIX}:${token}`, (err, data) => {
if (err) logger.error(err, 'socket.redis.error', shortenToken(token))
redisClient.get(`${STORAGE_PREFIX}:${token}`).then((data) => {
if (data) {
const dataObj = JSON.parse(data.toString())
if (dataObj.action) sendProgress(dataObj)
}
})
}).catch((err) => logger.error(err, 'socket.redis.error', shortenToken(token)))
}

emitter().emit(`connection:${token}`)
Expand Down

0 comments on commit aee4d3f

Please sign in to comment.