Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@uppy/companion: upgrade redis to version 4.x #3589

Merged
merged 5 commits into from Jul 25, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 13 additions & 0 deletions .yarn/patches/@types-connect-redis-npm-0.0.18-4fd2b614d3
@@ -0,0 +1,13 @@
diff --git a/index.d.ts b/index.d.ts
index 413b15edf95c12b1b176279c82b3a9c3f06ade20..0158f6f7a95935acbb67aeea00cb94c241475565 100755
--- a/index.d.ts
+++ b/index.d.ts
@@ -19,7 +19,7 @@ declare module 'connect-redis' {
function s(options: (options?: session.SessionOptions) => express.RequestHandler): s.RedisStore;

namespace s {
- type Client = redis.RedisClient | ioRedis.Redis | ioRedis.Cluster;
+ type Client = redis.RedisClientType | ioRedis.Redis | ioRedis.Cluster;
interface RedisStore extends session.Store {
new (options: RedisStoreOptions): RedisStore;
client: Client;
2 changes: 1 addition & 1 deletion examples/angular-example/package.json
Expand Up @@ -45,7 +45,7 @@
"@typescript-eslint/parser": "^5.0.0",
"eslint": "^8.0.0",
"eslint-plugin-import": "^2.22.1",
"eslint-plugin-jsdoc": "^37.0.0",
"eslint-plugin-jsdoc": "^38.0.0",
"eslint-plugin-prefer-arrow": "^1.2.3",
"jasmine-core": "~3.6.0",
"jasmine-spec-reporter": "~5.0.0",
Expand Down
8 changes: 4 additions & 4 deletions package.json
Expand Up @@ -77,13 +77,13 @@
"eslint-plugin-compat": "^4.0.0",
"eslint-plugin-cypress": "^2.12.1",
"eslint-plugin-import": "^2.25.2",
"eslint-plugin-jest": "^25.0.0",
"eslint-plugin-jsdoc": "^37.0.0",
"eslint-plugin-jest": "^26.0.0",
"eslint-plugin-jsdoc": "^38.0.0",
"eslint-plugin-jsx-a11y": "^6.4.1",
"eslint-plugin-markdown": "^2.2.0",
"eslint-plugin-node": "^11.1.0",
"eslint-plugin-prefer-import": "^0.0.1",
"eslint-plugin-promise": "^5.1.1",
"eslint-plugin-promise": "^6.0.0",
"eslint-plugin-react": "^7.22.0",
"eslint-plugin-react-hooks": "^4.2.0",
"eslint-plugin-unicorn": "^43.0.0",
Expand Down Expand Up @@ -200,7 +200,7 @@
]
},
"resolutions": {
"@types/redis": "2",
"@types/connect-redis@0.0.18": "patch:@types/connect-redis@npm:0.0.18#.yarn/patches/@types-connect-redis-npm-0.0.18-4fd2b614d3",
"@types/eslint@^7.2.13": "^8.2.0",
"@types/react": "^17",
"@types/webpack-dev-server": "^4",
Expand Down
7 changes: 3 additions & 4 deletions packages/@uppy/companion/package.json
Expand Up @@ -34,7 +34,7 @@
"body-parser": "1.19.0",
"chalk": "2.4.2",
"common-tags": "1.8.0",
"connect-redis": "4.0.3",
"connect-redis": "6.1.3",
aduh95 marked this conversation as resolved.
Show resolved Hide resolved
"cookie-parser": "1.4.6",
"cors": "^2.8.5",
"escape-goat": "3.0.0",
Expand All @@ -55,11 +55,10 @@
"moment-timezone": "^0.5.31",
"morgan": "1.10.0",
"ms": "2.1.2",
"node-redis-pubsub": "^5.0.0",
"node-schedule": "1.3.2",
"prom-client": "12.0.0",
"purest": "3.1.0",
"redis": "3.1.1",
"redis": "4.2.0",
"request": "2.88.2",
"semver": "6.3.0",
"serialize-error": "^2.1.0",
Expand All @@ -70,7 +69,7 @@
},
"devDependencies": {
"@types/compression": "1.7.0",
"@types/connect-redis": "0.0.17",
"@types/connect-redis": "0.0.18",
"@types/cookie-parser": "1.4.2",
"@types/cors": "2.8.6",
"@types/eslint": "^8.2.0",
Expand Down
2 changes: 1 addition & 1 deletion packages/@uppy/companion/src/server/Uploader.js
Expand Up @@ -340,7 +340,7 @@ class Uploader {
size,
companionOptions: req.companion.options,
pathPrefix: `${req.companion.options.filePath}`,
storage: redis.client(),
storage: redis.client()?.v4,
s3: req.companion.s3Client ? {
client: req.companion.s3Client,
options: req.companion.options.s3,
Expand Down
103 changes: 86 additions & 17 deletions packages/@uppy/companion/src/server/emitter/redis-emitter.js
@@ -1,49 +1,114 @@
const NRP = require('node-redis-pubsub')
const redis = require('redis')
const { EventEmitter } = require('node:events')

/**
* This module simulates the builtin events.EventEmitter but with the use of redis.
* This is useful for when companion is running on multiple instances and events need
* to be distributed across.
*/
module.exports = (redisUrl, redisPubSubScope) => {
const nrp = new NRP({ url: redisUrl, scope: redisPubSubScope })
const prefix = redisPubSubScope ? `${redisPubSubScope}:` : ''
const getPrefixedEventName = (eventName) => `${prefix}${eventName}`
const publisher = redis.createClient({ url: redisUrl })
let subscriber

const connectedPromise = publisher.connect().then(() => {
subscriber = publisher.duplicate()
return subscriber.connect()
})
aduh95 marked this conversation as resolved.
Show resolved Hide resolved
aduh95 marked this conversation as resolved.
Show resolved Hide resolved

const handlersByEvent = new Map()

const errorEmitter = new EventEmitter()
const handleError = (err) => errorEmitter.emit('error', err)

connectedPromise.catch((err) => handleError(err))

async function runWhenConnected (fn) {
try {
await connectedPromise
await fn()
} catch (err) {
handleError(err)
}
}

function addListener (eventName, handler, _once = false) {
function actualHandler (message) {
if (_once) removeListener(eventName, handler)
let args
try {
args = JSON.parse(message)
} catch (ex) {
return handleError(new Error(`Invalid JSON received! Channel: ${eventName} Message: ${message}`))
}
return handler(...args)
}

let handlersByThisEventName = handlersByEvent.get(eventName)
aduh95 marked this conversation as resolved.
Show resolved Hide resolved
if (handlersByThisEventName == null) {
handlersByThisEventName = new WeakMap()
handlersByEvent.set(eventName, handlersByThisEventName)
}
handlersByThisEventName.set(handler, actualHandler)

runWhenConnected(() => subscriber.pSubscribe(getPrefixedEventName(eventName), actualHandler))
}

/**
* Add an event listener
*
* @param {string} eventName name of the event
* @param {any} handler the handler of the event
*/
function on (eventName, handler) {
nrp.on(eventName, handler)
if (eventName === 'error') return errorEmitter.on('error', handler)

return addListener(eventName, handler)
}

/**
* Add a one-off event listener
* Add an event listener (will be triggered at most once)
*
* @param {string} eventName name of the event
* @param {Function} handler the handler of the event
* @param {any} handler the handler of the event
*/
function once (eventName, handler) {
aduh95 marked this conversation as resolved.
Show resolved Hide resolved
const off = nrp.on(eventName, (message) => {
handler(message)
off()
})
if (eventName === 'error') return errorEmitter.once('error', handler)

return addListener(eventName, handler, true)
}

/**
* Announce the occurence of an event
*
* @param {string} eventName name of the event
* @param {object} message the message to pass along with the event
*/
function emit (eventName, message) {
return nrp.emit(eventName, message || {})
function emit (eventName, ...args) {
runWhenConnected(() => publisher.publish(getPrefixedEventName(eventName), JSON.stringify(args)))
}

/**
* Remove an event listener
*
* @param {string} eventName name of the event
* @param {Function} handler the handler of the event to remove
* @param {any} handler the handler of the event to remove
*/
function removeListener (eventName, handler) {
nrp.receiver.removeListener(eventName, handler)
nrp.receiver.punsubscribe(`${nrp.prefix}${eventName}`)
if (eventName === 'error') return errorEmitter.removeListener('error', handler)

return runWhenConnected(() => {
const handlersByThisEventName = handlersByEvent.get(eventName)
if (handlersByThisEventName == null) return undefined

const actualHandler = handlersByThisEventName.get(handler)
if (actualHandler == null) return undefined

handlersByThisEventName.delete(handler)
if (handlersByThisEventName.size === 0) handlersByEvent.delete(eventName)

return subscriber.pUnsubscribe(getPrefixedEventName(eventName), actualHandler)
})
}

/**
Expand All @@ -52,8 +117,12 @@ module.exports = (redisUrl, redisPubSubScope) => {
* @param {string} eventName name of the event
*/
function removeAllListeners (eventName) {
nrp.receiver.removeAllListeners(eventName)
nrp.receiver.punsubscribe(`${nrp.prefix}${eventName}`)
if (eventName === 'error') return errorEmitter.removeAllListeners(eventName)

return runWhenConnected(() => {
handlersByEvent.delete(eventName)
return subscriber.pUnsubscribe(getPrefixedEventName(eventName))
})
}

return {
Expand Down
18 changes: 15 additions & 3 deletions packages/@uppy/companion/src/server/redis.js
@@ -1,5 +1,6 @@
const redis = require('redis')
const merge = require('lodash.merge')

const logger = require('./logger')

let redisClient

Expand All @@ -11,7 +12,18 @@ let redisClient
*/
function createClient (opts) {
if (!redisClient) {
redisClient = redis.createClient(opts)
// todo remove legacyMode when fixed: https://github.com/tj/connect-redis/issues/361
redisClient = redis.createClient({ ...opts, legacyMode: true })
aduh95 marked this conversation as resolved.
Show resolved Hide resolved
aduh95 marked this conversation as resolved.
Show resolved Hide resolved

;(async () => {
try {
// fire and forget.
// any requests made on the client before connection is established will be auto-queued by node-redis
await redisClient.connect()
} catch (err) {
logger.error(err.message, 'redis.error')
}
})()
}

return redisClient
Expand All @@ -22,5 +34,5 @@ module.exports.client = (companionOptions) => {
return redisClient
}

return createClient(merge({ url: companionOptions.redisUrl }, companionOptions.redisOptions))
return createClient({ ...companionOptions.redisOptions, url: companionOptions.redisUrl })
}
7 changes: 3 additions & 4 deletions packages/@uppy/companion/src/server/socket.js
Expand Up @@ -12,7 +12,7 @@ const { STORAGE_PREFIX, shortenToken } = require('./Uploader')
*/
module.exports = (server) => {
const wss = new SocketServer({ server })
const redisClient = redis.client()
const redisClient = redis.client()?.v4

// A new connection is usually created when an upload begins,
// or when connection fails while an upload is on-going and,
Expand All @@ -38,13 +38,12 @@ module.exports = (server) => {
// if the redisClient is available, then we attempt to check the storage
// if we have any already stored progress data on the upload.
if (redisClient) {
redisClient.get(`${STORAGE_PREFIX}:${token}`, (err, data) => {
if (err) logger.error(err, 'socket.redis.error', shortenToken(token))
redisClient.get(`${STORAGE_PREFIX}:${token}`).then((data) => {
if (data) {
const dataObj = JSON.parse(data.toString())
if (dataObj.action) sendProgress(dataObj)
}
})
}).catch((err) => logger.error(err, 'socket.redis.error', shortenToken(token)))
}

emitter().emit(`connection:${token}`)
Expand Down