Skip to content

Commit

Permalink
fixup! finish redis emitter rewrite
Browse files Browse the repository at this point in the history
  • Loading branch information
aduh95 committed Jul 25, 2022
1 parent 8478ecd commit 6da020b
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 21 deletions.
1 change: 1 addition & 0 deletions packages/@uppy/companion/package.json
Expand Up @@ -94,6 +94,7 @@
"lib/"
],
"jest": {
"testEnvironment": "node",
"testTimeout": 10000,
"automock": false,
"collectCoverage": true,
Expand Down
49 changes: 33 additions & 16 deletions packages/@uppy/companion/src/server/Uploader.js
Expand Up @@ -4,10 +4,13 @@ const { randomUUID } = require('node:crypto')
const isObject = require('isobject')
const validator = require('validator')
const request = require('request')
const { pipeline } = require('node:stream/promises')
const { pipeline: pipelineCb } = require('node:stream')
const { join } = require('node:path')
const fs = require('node:fs')
const { once } = require('node:events')
const { promisify } = require('node:util')

// TODO move to `require('streams/promises').pipeline` when dropping support for Node.js 14.x.
const pipeline = promisify(pipelineCb)

const { createReadStream, createWriteStream, ReadStream } = fs
const { stat, unlink } = fs.promises
Expand Down Expand Up @@ -358,21 +361,35 @@ class Uploader {
async awaitReady (timeout) {
logger.debug('waiting for socket connection', 'uploader.socket.wait', this.shortToken)

// todo use AbortSignal.timeout(timeout) once supported by jest
function createAbortSignal (ms) {
const controller = new AbortController()
setTimeout(() => controller.abort(), ms)
return controller.signal
}
// TODO: replace the Promise constructor call when dropping support for Node.js <16 with
// await once(emitter, eventName, timeout && { signal: AbortSignal.timeout(timeout) })
await new Promise((resolve, reject) => {
const eventName = `connection:${this.token}`
let timer
let onEvent

const eventName = `connection:${this.token}`
try {
await once(emitter(), eventName, timeout && { signal: createAbortSignal(timeout) })
logger.debug('socket connection received', 'uploader.socket.wait', this.shortToken)
} catch (err) {
if (err.code === 'ABORT_ERR') throw new Error('Timed out waiting for socket connection')
throw err
}
function cleanup () {
emitter().removeListener(eventName, onEvent)
clearTimeout(timer)
}

if (timeout) {
// Need to timeout after a while, or we could leak emitters
timer = setTimeout(() => {
cleanup()
reject(new Error('Timed out waiting for socket connection'))
}, timeout)
}

onEvent = () => {
cleanup()
resolve()
}

emitter().once(eventName, onEvent)
})

logger.debug('socket connection received', 'uploader.socket.wait', this.shortToken)
}

/**
Expand Down
12 changes: 8 additions & 4 deletions packages/@uppy/companion/src/server/emitter/redis-emitter.js
Expand Up @@ -45,8 +45,11 @@ module.exports = (redisUrl, redisPubSubScope) => {
return handler(...args)
}

if (!handlersByEvent.get(eventName)) handlersByEvent.set(eventName, new Map())
const handlersByThisEventName = handlersByEvent.get(eventName)
let handlersByThisEventName = handlersByEvent.get(eventName)
if (handlersByThisEventName == null) {
handlersByThisEventName = new WeakMap()
handlersByEvent.set(eventName, handlersByThisEventName)
}
handlersByThisEventName.set(handler, actualHandler)

runWhenConnected(() => subscriber.pSubscribe(getPrefixedEventName(eventName), actualHandler))
Expand Down Expand Up @@ -96,10 +99,11 @@ module.exports = (redisUrl, redisPubSubScope) => {

return runWhenConnected(() => {
const handlersByThisEventName = handlersByEvent.get(eventName)
if (!handlersByThisEventName) return undefined
if (handlersByThisEventName == null) return undefined

const actualHandler = handlersByThisEventName.get(handler)
if (!actualHandler) return undefined
if (actualHandler == null) return undefined

handlersByThisEventName.delete(handler)
if (handlersByThisEventName.size === 0) handlersByEvent.delete(eventName)

Expand Down
2 changes: 1 addition & 1 deletion website/src/docs/companion.md
Expand Up @@ -42,7 +42,7 @@ If you don’t have a Node.js project with a `package.json` you might want to in

### Prerequisite

Since Companion v4, you now need to be running `node.js >= v16.14.0` to use Companion.
Since v2, you now need to be running `node.js >= v10.20.1` to use Companion. Please see [Migrating v1 to v2](#Migrating-v1-to-v2)

Unfortunately, Windows is not a supported platform right now. It may work, and we’re happy to accept improvements in this area, but we can’t provide support.

Expand Down

0 comments on commit 6da020b

Please sign in to comment.