From 3835149543ffddeb3ebdc4978b9fe6e4a0726712 Mon Sep 17 00:00:00 2001 From: Mikael Finstad Date: Mon, 1 Nov 2021 16:35:25 +0700 Subject: [PATCH] Rewrite Companion providers to use streams to allow simultaneous upload/download without saving to disk (#3159) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * rewrite to async/await * Only fetch size (HEAD) if needed #3034 * Update packages/@uppy/companion/src/server/controllers/url.js Co-authored-by: Antoine du Hamel * Change HEAD to GET in getURLMeta and abort request immediately upon response headers received https://github.com/transloadit/uppy/issues/3034#issuecomment-908059234 * fix lint * fix lint * cut off length of file names or else we get "MetadataTooLarge: Your metadata headers exceed the maximum allowed metadata size" in tus / S3 * try to fix flaky test * remove iife and cleanup code a bit * fix lint by reordering code * rename Uploader to MultipartUploader * Rewrite Uploader to use fs-capacitor #3098 This allows for upload to start almost immediately without having to first download the file. And it allows for uploading bigger files, because transloadit assembly will not timeout, as it will get upload progress events all the time. No longer need for illusive progress. Also fix eslint warnings and simplify logic Still TODO: TUS pause/resume has a bug: https://github.com/tus/tus-js-client/issues/275 * add comment in dev Dashboard and pull out variable * fix a bug where remote xhr upload would ignore progress events in the UI * fix bug where s3 multipart cancel wasn't working * fix also cancel for xhr * Rewrite providers to use streams This removes the need for disk space as data will be buffered in memory and backpressure will be respected https://github.com/transloadit/uppy/issues/3098#issuecomment-907763809 All providers "download" methods will now return a { stream } which can be consumed by uploader. Also: - Remove capacitor (no longer needed) - Change Provider/SearchProvider API to async (Breaking change for custom companion providers) - Fix the case with unknown length streams (zoom / google drive). Need to be downloaded first - rewrite controllers deauth-callback, thumbnail, list, logout to async - getURLMeta: make sure size is never NaN (NaN gets converted to null in JSON.stringify when sent to client but not when used in backend) - fix purest mock (it wasn't returning statusCode on('response')) - add missing http mock for "request" for THUMBNAIL_URL and http://url.myendpoint.com/file (these request errors were never caught by tests previously) - "upload functions with tus protocol" test: move filename checking to new test where size is null. Fix broken expects - fix some lint * Implement streamingUpload flag COMPANION_STREAMING_UPLOAD Default to false due to backward compatibility If set to true, will start to upload files at the same time as dowlnoading them, by piping the streams - Also implement progress for downloading too - and fix progress duplication logic - fix test that assumed file was fully downloaded after first progress event * rearrange validation logic * add COMPANION_STREAMING_UPLOAD to env.test.sh too * implement maxFileSize option in companion for both unknown length and known length downloads * fix bug * fix memory leak when non 200 status streams were being kept * fix lint * Add backward-compatibility for companion providers Implement a new static field "version" on providers, which when not set to 2, will cause a compatibility layer to be added for supporting old callback style provider api also fix some eslint and rename some vars * document new provider API * remove static as it doesn't work on node 10 * try to fix build issue * degrade to node 14 in github actions due to hitting this error: https://github.com/nodejs/node/issues/40030 https://github.com/transloadit/uppy/pull/3159/checks?check_run_id=3544858518 * pull out duplicated logic into reusable function * fix lint * make methods private * re-add unsplash download_location request got lost in merge * add try/catch as suggested https://github.com/transloadit/uppy/pull/3159#discussion_r727149263 * Only set default chunkSize if needed for being more compliant with previous behavior when streamingUpload = false * Improve flaky test Trying to fix this error: FAIL packages/@uppy/utils/src/delay.test.js ● delay › should reject when signal is aborted expect(received).toBeLessThan(expected) Expected: < 70 Received: 107 32 | const time = Date.now() - start 33 | expect(time).toBeGreaterThanOrEqual(30) > 34 | expect(time).toBeLessThan(70) | ^ 35 | }) 36 | }) 37 | at Object. (packages/@uppy/utils/src/delay.test.js:34:18) https://github.com/transloadit/uppy/runs/3984613454?check_suite_focus=true * Apply suggestions from code review Co-authored-by: Antoine du Hamel * fix review feedback & lint * Apply suggestions from code review Co-authored-by: Merlijn Vos * remove unneeded ts-ignore * Update packages/@uppy/companion/src/server/controllers/url.js Co-authored-by: Antoine du Hamel * Update packages/@uppy/companion/src/server/Uploader.js Co-authored-by: Antoine du Hamel * reduce nesting * fix lint * optimize promisify https://github.com/transloadit/uppy/pull/3159#discussion_r738160576 * Update packages/@uppy/companion/test/__tests__/uploader.js Co-authored-by: Antoine du Hamel Co-authored-by: Antoine du Hamel Co-authored-by: Merlijn Vos --- KUBERNETES.md | 1 + env.test.sh | 2 + env_example | 1 + package.json | 2 + src/companion.js | 1 + src/server/Uploader.js | 607 ++++++++++--------- src/server/controllers/deauth-callback.js | 22 +- src/server/controllers/get.js | 49 +- src/server/controllers/list.js | 21 +- src/server/controllers/logout.js | 34 +- src/server/controllers/thumbnail.js | 15 +- src/server/controllers/url.js | 85 ++- src/server/helpers/request.js | 5 +- src/server/helpers/upload.js | 46 ++ src/server/helpers/utils.js | 23 + src/server/provider/Provider.js | 22 +- src/server/provider/ProviderCompat.js | 54 ++ src/server/provider/SearchProvider.js | 14 +- src/server/provider/box/index.js | 51 +- src/server/provider/drive/index.js | 124 ++-- src/server/provider/dropbox/index.js | 63 +- src/server/provider/error.js | 2 + src/server/provider/facebook/index.js | 80 +-- src/server/provider/index.js | 72 ++- src/server/provider/instagram/graph/index.js | 79 +-- src/server/provider/onedrive/index.js | 59 +- src/server/provider/unsplash/index.js | 88 +-- src/server/provider/zoom/index.js | 109 ++-- src/standalone/helper.js | 2 + test/__mocks__/purest.js | 21 +- test/__tests__/providers.js | 15 +- test/__tests__/uploader.js | 149 ++++- test/__tests__/url.js | 14 +- test/fixtures/facebook.js | 3 - test/mocksocket.js | 8 + 35 files changed, 1126 insertions(+), 817 deletions(-) create mode 100644 src/server/helpers/upload.js create mode 100644 src/server/provider/ProviderCompat.js diff --git a/KUBERNETES.md b/KUBERNETES.md index fe80d612ba..3cb6405b97 100644 --- a/KUBERNETES.md +++ b/KUBERNETES.md @@ -25,6 +25,7 @@ data: COMPANION_DOMAIN: "YOUR SERVER DOMAIN" COMPANION_DOMAINS: "sub1.domain.com,sub2.domain.com,sub3.domain.com" COMPANION_PROTOCOL: "YOUR SERVER PROTOCOL" + COMPANION_STREAMING_UPLOAD: true COMPANION_REDIS_URL: redis://:superSecretPassword@uppy-redis.uppy.svc.cluster.local:6379 COMPANION_SECRET: "shh!Issa Secret!" COMPANION_DROPBOX_KEY: "YOUR DROPBOX KEY" diff --git a/env.test.sh b/env.test.sh index 02f94c9ef3..840e39a181 100644 --- a/env.test.sh +++ b/env.test.sh @@ -5,6 +5,8 @@ export COMPANION_SELF_ENDPOINT="localhost:3020" export COMPANION_HIDE_METRICS="false" export COMPANION_HIDE_WELCOME="false" +export COMPANION_STREAMING_UPLOAD="true" + export COMPANION_PROTOCOL="http" export COMPANION_DATADIR="./test/output" export COMPANION_SECRET="secret" diff --git a/env_example b/env_example index a9ecf32257..1bddc277c4 100644 --- a/env_example +++ b/env_example @@ -4,6 +4,7 @@ COMPANION_DOMAIN=uppy.xxxx.com COMPANION_SELF_ENDPOINT=uppy.xxxx.com COMPANION_HIDE_METRICS=false COMPANION_HIDE_WELCOME=false +COMPANION_STREAMING_UPLOAD=true COMPANION_PROTOCOL=https COMPANION_DATADIR=/mnt/uppy-server-data diff --git a/package.json b/package.json index 1bd9673609..7d40c7d87b 100644 --- a/package.json +++ b/package.json @@ -85,6 +85,8 @@ "@types/uuid": "3.4.7", "@types/webpack": "^5.28.0", "@types/ws": "6.0.4", + "into-stream": "^6.0.0", + "nock": "^13.1.3", "supertest": "3.4.2", "typescript": "~4.3" }, diff --git a/src/companion.js b/src/companion.js index 6cd313d58a..302b453116 100644 --- a/src/companion.js +++ b/src/companion.js @@ -39,6 +39,7 @@ const defaultOptions = { }, debug: true, logClientVersion: true, + streamingUpload: false, } // make the errors available publicly for custom providers diff --git a/src/server/Uploader.js b/src/server/Uploader.js index 318d513f7c..a5266dc446 100644 --- a/src/server/Uploader.js +++ b/src/server/Uploader.js @@ -1,10 +1,20 @@ -const fs = require('fs') -const path = require('path') const tus = require('tus-js-client') const uuid = require('uuid') const isObject = require('isobject') const validator = require('validator') const request = require('request') +// eslint-disable-next-line no-unused-vars +const { Readable, pipeline: pipelineCb } = require('stream') +const { join } = require('path') +const fs = require('fs') +const { promisify } = require('util') + +// TODO move to `require('streams/promises').pipeline` when dropping support for Node.js 14.x. +const pipeline = promisify(pipelineCb) + +const { createReadStream, createWriteStream, ReadStream } = fs +const { stat, unlink } = fs.promises + /** @type {any} */ // @ts-ignore - typescript resolves this this to a hoisted version of // serialize-error that ships with a declaration file, we are using a version @@ -26,6 +36,12 @@ const PROTOCOLS = Object.freeze({ tus: 'tus', }) +function exceedsMaxFileSize (maxFileSize, size) { + return maxFileSize && size && size > maxFileSize +} + +class AbortError extends Error {} + class Uploader { /** * Uploads file to destination based on the supplied protocol (tus, s3-multipart, multipart) @@ -34,19 +50,19 @@ class Uploader { * * @typedef {object} UploaderOptions * @property {string} endpoint - * @property {string=} uploadUrl + * @property {string} [uploadUrl] * @property {string} protocol - * @property {number} size - * @property {string=} fieldname + * @property {number} [size] + * @property {string} [fieldname] * @property {string} pathPrefix - * @property {any=} s3 + * @property {any} [s3] * @property {any} metadata * @property {any} companionOptions - * @property {any=} storage - * @property {any=} headers - * @property {string=} httpMethod - * @property {boolean=} useFormData - * @property {number=} chunkSize + * @property {any} [storage] + * @property {any} [headers] + * @property {string} [httpMethod] + * @property {boolean} [useFormData] + * @property {number} [chunkSize] * * @param {UploaderOptions} options */ @@ -58,23 +74,27 @@ class Uploader { this.options = options this.token = uuid.v4() - this.path = `${this.options.pathPrefix}/${Uploader.FILE_NAME_PREFIX}-${this.token}` + this.fileName = `${Uploader.FILE_NAME_PREFIX}-${this.token}` this.options.metadata = this.options.metadata || {} this.options.fieldname = this.options.fieldname || DEFAULT_FIELD_NAME + this.size = options.size this.uploadFileName = this.options.metadata.name ? this.options.metadata.name.substring(0, MAX_FILENAME_LENGTH) - : path.basename(this.path) - this.streamsEnded = false + : this.fileName + this.uploadStopped = false - this.writeStream = fs.createWriteStream(this.path, { mode: 0o666 }) // no executable files - .on('error', (err) => logger.error(`${err}`, 'uploader.write.error', this.shortToken)) - /** @type {number} */ - this.emittedProgress = 0 + + this.emittedProgress = {} this.storage = options.storage this._paused = false + this.downloadedBytes = 0 + + this.readStream = null + if (this.options.protocol === PROTOCOLS.tus) { emitter().on(`pause:${this.token}`, () => { + logger.debug('Received from client: pause', 'uploader', this.shortToken) this._paused = true if (this.tus) { this.tus.abort() @@ -82,20 +102,110 @@ class Uploader { }) emitter().on(`resume:${this.token}`, () => { + logger.debug('Received from client: resume', 'uploader', this.shortToken) this._paused = false if (this.tus) { this.tus.start() } }) + } - emitter().on(`cancel:${this.token}`, () => { - this._paused = true - if (this.tus) { - const shouldTerminate = !!this.tus.url - this.tus.abort(shouldTerminate).catch(() => {}) - } - this.cleanUp() - }) + emitter().on(`cancel:${this.token}`, () => { + logger.debug('Received from client: cancel', 'uploader', this.shortToken) + this._paused = true + if (this.tus) { + const shouldTerminate = !!this.tus.url + this.tus.abort(shouldTerminate).catch(() => {}) + } + this.abortReadStream(new AbortError()) + }) + } + + abortReadStream (err) { + this.uploadStopped = true + if (this.readStream) this.readStream.destroy(err) + } + + async _uploadByProtocol () { + // @todo a default protocol should not be set. We should ensure that the user specifies their protocol. + const protocol = this.options.protocol || PROTOCOLS.multipart + + switch (protocol) { + case PROTOCOLS.multipart: + return this._uploadMultipart(this.readStream) + case PROTOCOLS.s3Multipart: + return this._uploadS3Multipart(this.readStream) + case PROTOCOLS.tus: + return this._uploadTus(this.readStream) + default: + throw new Error('Invalid protocol') + } + } + + async _downloadStreamAsFile (stream) { + this.tmpPath = join(this.options.pathPrefix, this.fileName) + + logger.debug('fully downloading file', 'uploader.download', this.shortToken) + const writeStream = createWriteStream(this.tmpPath) + + const onData = (chunk) => { + this.downloadedBytes += chunk.length + if (exceedsMaxFileSize(this.options.companionOptions.maxFileSize, this.downloadedBytes)) this.abortReadStream(new Error('maxFileSize exceeded')) + this.onProgress(0, undefined) + } + + stream.on('data', onData) + + await pipeline(stream, writeStream) + logger.debug('finished fully downloading file', 'uploader.download', this.shortToken) + + const { size } = await stat(this.tmpPath) + + this.size = size + + const fileStream = createReadStream(this.tmpPath) + this.readStream = fileStream + } + + _needDownloadFirst () { + return !this.options.size || !this.options.companionOptions.streamingUpload + } + + /** + * + * @param {Readable} stream + */ + async uploadStream (stream) { + try { + if (this.uploadStopped) throw new Error('Cannot upload stream after upload stopped') + if (this.readStream) throw new Error('Already uploading') + + this.readStream = stream + if (this._needDownloadFirst()) { + logger.debug('need to download the whole file first', 'controller.get.provider.size', this.shortToken) + // Some streams need to be downloaded entirely first, because we don't know their size from the provider + // This is true for zoom and drive (exported files) or some URL downloads. + // The stream will then typically come from a "Transfer-Encoding: chunked" response + await this._downloadStreamAsFile(this.readStream) + } + if (this.uploadStopped) return + + const { url, extraData } = await Promise.race([ + this._uploadByProtocol(), + // If we don't handle stream errors, we get unhandled error in node. + new Promise((resolve, reject) => this.readStream.on('error', reject)), + ]) + this.emitSuccess(url, extraData) + } catch (err) { + if (err instanceof AbortError) { + logger.error('Aborted upload', 'uploader.aborted', this.shortToken) + return + } + // console.log(err) + logger.error(err, 'uploader.error', this.shortToken) + this.emitError(err) + } finally { + this.cleanUp() } } @@ -136,13 +246,6 @@ class Uploader { } } - /** - * the number of bytes written into the streams - */ - get bytesWritten () { - return this.writeStream.bytesWritten - } - /** * Validate the options passed down to the uplaoder * @@ -164,6 +267,11 @@ class Uploader { } } + if (exceedsMaxFileSize(options.companionOptions.maxFileSize, options.size)) { + this._errRespMessage = 'maxFileSize exceeded' + return false + } + // validate fieldname if (options.fieldname && typeof options.fieldname !== 'string') { this._errRespMessage = 'fieldname must be a string' @@ -192,13 +300,29 @@ class Uploader { // s3 uploads don't require upload destination // validation, because the destination is determined // by the server's s3 config - if (options.protocol === PROTOCOLS.s3Multipart) { - return true - } + if (options.protocol !== PROTOCOLS.s3Multipart) { + if (!options.endpoint && !options.uploadUrl) { + this._errRespMessage = 'no destination specified' + return false + } - if (!options.endpoint && !options.uploadUrl) { - this._errRespMessage = 'no destination specified' - return false + const validateUrl = (url) => { + const validatorOpts = { require_protocol: true, require_tld: false } + if (url && !validator.isURL(url, validatorOpts)) { + this._errRespMessage = 'invalid destination url' + return false + } + + const allowedUrls = options.companionOptions.uploadUrls + if (allowedUrls && url && !hasMatch(url, allowedUrls)) { + this._errRespMessage = 'upload destination does not match any allowed destinations' + return false + } + + return true + } + + if (![options.endpoint, options.uploadUrl].every(validateUrl)) return false } if (options.chunkSize != null && typeof options.chunkSize !== 'number') { @@ -206,21 +330,7 @@ class Uploader { return false } - const validatorOpts = { require_protocol: true, require_tld: false } - return [options.endpoint, options.uploadUrl].every((url) => { - if (url && !validator.isURL(url, validatorOpts)) { - this._errRespMessage = 'invalid destination url' - return false - } - - const allowedUrls = options.companionOptions.uploadUrls - if (allowedUrls && url && !hasMatch(url, allowedUrls)) { - this._errRespMessage = 'upload destination does not match any allowed destinations' - return false - } - - return true - }) + return true } hasError () { @@ -236,93 +346,22 @@ class Uploader { return Uploader.shortenToken(this.token) } - /** - * - * @param {Function} callback - */ - onSocketReady (callback) { - /** @type {any} */ // WriteStream.pending was added in Node.js 11.2.0 - const stream = this.writeStream - if (stream.pending) { - let connected = false - emitter().once(`connection:${this.token}`, () => { if (stream.pending) connected = true; else callback() }) - this.writeStream.once('ready', () => connected && callback()) - } else { - emitter().once(`connection:${this.token}`, () => callback()) - } - logger.debug('waiting for connection', 'uploader.socket.wait', this.shortToken) + async awaitReady () { + // TODO timeout after a while? Else we could leak emitters + logger.debug('waiting for socket connection', 'uploader.socket.wait', this.shortToken) + await new Promise((resolve) => emitter().once(`connection:${this.token}`, resolve)) + logger.debug('socket connection received', 'uploader.socket.wait', this.shortToken) } cleanUp () { - fs.unlink(this.path, (err) => { - if (err) { - logger.error(`cleanup failed for: ${this.path} err: ${err}`, 'uploader.cleanup.error') - } - }) + logger.debug('cleanup', this.shortToken) + if (this.readStream && !this.readStream.destroyed) this.readStream.destroy() + + if (this.tmpPath) unlink(this.tmpPath).catch(() => {}) + emitter().removeAllListeners(`pause:${this.token}`) emitter().removeAllListeners(`resume:${this.token}`) emitter().removeAllListeners(`cancel:${this.token}`) - this.uploadStopped = true - } - - /** - * - * @param {Error} err - * @param {string | Buffer | Buffer[]} chunk - */ - handleChunk (err, chunk) { - if (this.uploadStopped) { - return - } - - if (err) { - logger.error(err, 'uploader.download.error', this.shortToken) - this.emitError(err) - this.cleanUp() - return - } - - // @todo a default protocol should not be set. We should ensure that the user specifies their protocol. - const protocol = this.options.protocol || PROTOCOLS.multipart - - // The download has completed; close the file and start an upload if necessary. - if (chunk === null) { - this.writeStream.on('finish', () => { - this.streamsEnded = true - switch (protocol) { - case PROTOCOLS.multipart: - if (this.options.endpoint) { - this.uploadMultipart() - } - break - case PROTOCOLS.s3Multipart: - if (!this.s3Upload) { - this.uploadS3Multipart() - } else { - logger.warn('handleChunk() called multiple times', 'uploader.s3.duplicate', this.shortToken) - } - break - case PROTOCOLS.tus: - if (!this.tus) { - this.uploadTus() - } else { - logger.warn('handleChunk() called multiple times', 'uploader.tus.duplicate', this.shortToken) - } - break - } - }) - - return this.endStreams() - } - - this.writeStream.write(chunk, () => { - logger.debug(`${this.bytesWritten} bytes`, 'uploader.download.progress', this.shortToken) - return this.emitIllusiveProgress() - }) - } - - endStreams () { - this.writeStream.end() } getResponse () { @@ -342,62 +381,49 @@ class Uploader { } /** - * This method emits upload progress but also creates an "upload progress" illusion - * for the waiting period while only download is happening. Hence, it combines both - * download and upload into an upload progress. * - * @see emitProgress - * @param {number=} bytesUploaded the bytes actually Uploaded so far + * @param {number} [bytesUploaded] + * @param {number | null} [bytesTotalIn] */ - emitIllusiveProgress (bytesUploaded = 0) { - if (this._paused) { - return + onProgress (bytesUploaded = 0, bytesTotalIn = 0) { + const bytesTotal = bytesTotalIn || this.size || 0 + + // If fully downloading before uploading, combine downloaded and uploaded bytes + // This will make sure that the user sees half of the progress before upload starts (while downloading) + let combinedBytes = bytesUploaded + if (this._needDownloadFirst()) { + combinedBytes = Math.floor((combinedBytes + (this.downloadedBytes || 0)) / 2) } - let bytesTotal = this.streamsEnded ? this.bytesWritten : this.options.size - if (!this.streamsEnded) { - bytesTotal = Math.max(bytesTotal, this.bytesWritten) - } - // for a 10MB file, 10MB of download will account for 5MB upload progress - // and 10MB of actual upload will account for the other 5MB upload progress. - const illusiveBytesUploaded = (this.bytesWritten / 2) + (bytesUploaded / 2) + // Prevent divide by zero + let percentage = 0 + if (bytesTotal > 0) percentage = Math.min(Math.max(0, ((combinedBytes / bytesTotal) * 100)), 100) + const formattedPercentage = percentage.toFixed(2) logger.debug( - `${bytesUploaded} ${illusiveBytesUploaded} ${bytesTotal}`, - 'uploader.illusive.progress', + `${combinedBytes} ${bytesTotal} ${formattedPercentage}%`, + 'uploader.total.progress', this.shortToken ) - this.emitProgress(illusiveBytesUploaded, bytesTotal) - } - /** - * - * @param {number} bytesUploaded - * @param {number | null} bytesTotal - */ - emitProgress (bytesUploaded, bytesTotal) { - bytesTotal = bytesTotal || this.options.size - if (this.tus && this.tus.options.uploadLengthDeferred && this.streamsEnded) { - bytesTotal = this.bytesWritten + if (this._paused || this.uploadStopped) { + return } - const percentage = (bytesUploaded / bytesTotal * 100) - const formatPercentage = percentage.toFixed(2) - logger.debug( - `${bytesUploaded} ${bytesTotal} ${formatPercentage}%`, - 'uploader.upload.progress', - this.shortToken - ) + const payload = { progress: formattedPercentage, bytesUploaded: combinedBytes, bytesTotal } const dataToEmit = { action: 'progress', - payload: { progress: formatPercentage, bytesUploaded, bytesTotal }, + payload, } this.saveState(dataToEmit) + const isEqual = (p1, p2) => (p1.progress === p2.progress + && p1.bytesUploaded === p2.bytesUploaded + && p1.bytesTotal === p2.bytesTotal) + // avoid flooding the client with progress events. - const roundedPercentage = Math.floor(percentage) - if (this.emittedProgress !== roundedPercentage) { - this.emittedProgress = roundedPercentage + if (!isEqual(this.emittedProgress, payload)) { + this.emittedProgress = payload emitter().emit(this.token, dataToEmit) } } @@ -419,15 +445,15 @@ class Uploader { /** * * @param {Error} err - * @param {object=} extraData */ - emitError (err, extraData = {}) { + emitError (err) { const serializedErr = serializeError(err) // delete stack to avoid sending server info to client delete serializedErr.stack const dataToEmit = { action: 'error', - payload: Object.assign(extraData, { error: serializedErr }), + // @ts-ignore + payload: Object.assign(err.extraData || {}, { error: serializedErr }), } this.saveState(dataToEmit) emitter().emit(this.token, dataToEmit) @@ -435,152 +461,152 @@ class Uploader { /** * start the tus upload + * + * @param {any} stream */ - uploadTus () { - const file = fs.createReadStream(this.path) + async _uploadTus (stream) { const uploader = this - this.tus = new tus.Upload(file, { - endpoint: this.options.endpoint, - uploadUrl: this.options.uploadUrl, - uploadLengthDeferred: false, - retryDelays: [0, 1000, 3000, 5000], - uploadSize: this.bytesWritten, - chunkSize: this.options.chunkSize || Infinity, - headers: headerSanitize(this.options.headers), - addRequestId: true, - metadata: { - // file name and type as required by the tusd tus server - // https://github.com/tus/tusd/blob/5b376141903c1fd64480c06dde3dfe61d191e53d/unrouted_handler.go#L614-L646 - filename: this.uploadFileName, - filetype: this.options.metadata.type, - ...this.options.metadata, - }, - /** - * - * @param {Error} error - */ - onError (error) { - logger.error(error, 'uploader.tus.error') - // deleting tus originalRequest field because it uses the same http-agent - // as companion, and this agent may contain sensitive request details (e.g headers) - // previously made to providers. Deleting the field would prevent it from getting leaked - // to the frontend etc. - // @ts-ignore - delete error.originalRequest - // @ts-ignore - delete error.originalResponse - uploader.emitError(error) - }, - /** - * - * @param {number} bytesUploaded - * @param {number} bytesTotal - */ - onProgress (bytesUploaded, bytesTotal) { // eslint-disable-line no-unused-vars - uploader.emitIllusiveProgress(bytesUploaded) - }, - onSuccess () { - uploader.emitSuccess(uploader.tus.url) - uploader.cleanUp() - }, - }) + const isFileStream = stream instanceof ReadStream + // chunkSize needs to be a finite value if the stream is not a file stream (fs.createReadStream) + // https://github.com/tus/tus-js-client/blob/4479b78032937ac14da9b0542e489ac6fe7e0bc7/lib/node/fileReader.js#L50 + const chunkSize = this.options.chunkSize || (isFileStream ? Infinity : 50e6) + + return new Promise((resolve, reject) => { + this.tus = new tus.Upload(stream, { + endpoint: this.options.endpoint, + uploadUrl: this.options.uploadUrl, + uploadLengthDeferred: false, + retryDelays: [0, 1000, 3000, 5000], + uploadSize: this.size, + chunkSize, + headers: headerSanitize(this.options.headers), + addRequestId: true, + metadata: { + // file name and type as required by the tusd tus server + // https://github.com/tus/tusd/blob/5b376141903c1fd64480c06dde3dfe61d191e53d/unrouted_handler.go#L614-L646 + filename: this.uploadFileName, + filetype: this.options.metadata.type, + ...this.options.metadata, + }, + /** + * + * @param {Error} error + */ + onError (error) { + logger.error(error, 'uploader.tus.error') + // deleting tus originalRequest field because it uses the same http-agent + // as companion, and this agent may contain sensitive request details (e.g headers) + // previously made to providers. Deleting the field would prevent it from getting leaked + // to the frontend etc. + // @ts-ignore + delete error.originalRequest + // @ts-ignore + delete error.originalResponse + reject(error) + }, + /** + * + * @param {number} [bytesUploaded] + * @param {number} [bytesTotal] + */ + onProgress (bytesUploaded, bytesTotal) { + uploader.onProgress(bytesUploaded, bytesTotal) + }, + onSuccess () { + resolve({ url: uploader.tus.url }) + }, + }) - if (!this._paused) { - this.tus.start() - } + if (!this._paused) { + this.tus.start() + } + }) } - uploadMultipart () { - const file = fs.createReadStream(this.path) + async _uploadMultipart (stream) { + if (!this.options.endpoint) { + throw new Error('No multipart endpoint set') + } // upload progress let bytesUploaded = 0 - file.on('data', (data) => { + stream.on('data', (data) => { bytesUploaded += data.length - this.emitIllusiveProgress(bytesUploaded) + this.onProgress(bytesUploaded, undefined) }) const httpMethod = (this.options.httpMethod || '').toLowerCase() === 'put' ? 'put' : 'post' const headers = headerSanitize(this.options.headers) const reqOptions = { url: this.options.endpoint, headers, encoding: null } - const httpRequest = request[httpMethod] + const runRequest = request[httpMethod] + if (this.options.useFormData) { reqOptions.formData = { - ...this.options.metadata, [this.options.fieldname]: { - value: file, + value: stream, options: { filename: this.uploadFileName, contentType: this.options.metadata.type, + knownLength: this.size, }, }, } - - httpRequest(reqOptions, (error, response, body) => { - this._onMultipartComplete(error, response, body, bytesUploaded) - }) } else { - reqOptions.headers['content-length'] = this.bytesWritten - reqOptions.body = file - httpRequest(reqOptions, (error, response, body) => { - this._onMultipartComplete(error, response, body, bytesUploaded) - }) + reqOptions.headers['content-length'] = this.size + reqOptions.body = stream } - } - _onMultipartComplete (error, response, body, bytesUploaded) { - if (error) { - logger.error(error, 'upload.multipart.error') - this.emitError(error) - return - } - const { headers } = response + const { response, body } = await new Promise((resolve, reject) => { + runRequest(reqOptions, (error, response2, body2) => { + if (error) { + logger.error(error, 'upload.multipart.error') + reject(error) + return + } + + resolve({ response: response2, body: body2 }) + }) + }) + // remove browser forbidden headers - delete headers['set-cookie'] - delete headers['set-cookie2'] + delete response.headers['set-cookie'] + delete response.headers['set-cookie2'] const respObj = { responseText: body.toString(), status: response.statusCode, statusText: response.statusMessage, - headers, + headers: response.headers, } if (response.statusCode >= 400) { logger.error(`upload failed with status: ${response.statusCode}`, 'upload.multipart.error') - this.emitError(new Error(response.statusMessage), respObj) - } else if (bytesUploaded !== this.bytesWritten && bytesUploaded !== this.options.size) { - const errMsg = `uploaded only ${bytesUploaded} of ${this.bytesWritten} with status: ${response.statusCode}` + const err = new Error(response.statusMessage) + // @ts-ignore + err.extraData = respObj + throw err + } + + if (bytesUploaded !== this.size) { + const errMsg = `uploaded only ${bytesUploaded} of ${this.size} with status: ${response.statusCode}` logger.error(errMsg, 'upload.multipart.mismatch.error') - this.emitError(new Error(errMsg)) - } else { - this.emitSuccess(null, { response: respObj, bytesUploaded }) + throw new Error(errMsg) } - this.cleanUp() + return { url: null, extraData: { response: respObj, bytesUploaded } } } /** * Upload the file to S3 using a Multipart upload. */ - uploadS3Multipart () { - const file = fs.createReadStream(this.path) - - return this._uploadS3MultipartStream(file) - } - - /** - * Upload a stream to S3. - */ - _uploadS3MultipartStream (stream) { + async _uploadS3Multipart (stream) { if (!this.options.s3) { - this.emitError(new Error('The S3 client is not configured on this companion instance.')) - return + throw new Error('The S3 client is not configured on this companion instance.') } - const filename = this.options.metadata.name || path.basename(this.path) + const filename = this.uploadFileName const { client, options } = this.options.s3 const upload = client.upload({ @@ -592,28 +618,29 @@ class Uploader { Body: stream, }) - this.s3Upload = upload - upload.on('httpUploadProgress', ({ loaded, total }) => { - this.emitProgress(loaded, total) + this.onProgress(loaded, total) }) - upload.send((error, data) => { - this.s3Upload = null - if (error) { - this.emitError(error) - } else { - const url = data && data.Location ? data.Location : null - this.emitSuccess(url, { - response: { - responseText: JSON.stringify(data), - headers: { - 'content-type': 'application/json', + return new Promise((resolve, reject) => { + upload.send((error, data) => { + if (error) { + reject(error) + return + } + + resolve({ + url: data && data.Location ? data.Location : null, + extraData: { + response: { + responseText: JSON.stringify(data), + headers: { + 'content-type': 'application/json', + }, }, }, }) - } - this.cleanUp() + }) }) } } diff --git a/src/server/controllers/deauth-callback.js b/src/server/controllers/deauth-callback.js index 8037d7fcd4..0157b90673 100644 --- a/src/server/controllers/deauth-callback.js +++ b/src/server/controllers/deauth-callback.js @@ -1,20 +1,22 @@ const { errorToResponse } = require('../provider/error') -function deauthCallback ({ body, companion, headers }, res, next) { +async function deauthCallback ({ body, companion, headers }, res, next) { // we need the provider instance to decide status codes because // this endpoint does not cater to a uniform client. // It doesn't respond to Uppy client like other endpoints. // Instead it responds to the providers themselves. - companion.provider.deauthorizationCallback({ companion, body, headers }, (err, data, status) => { - if (err) { - const errResp = errorToResponse(err) - if (errResp) { - return res.status(errResp.code).json({ message: errResp.message }) - } - return next(err) + try { + const { data, status } = await companion.provider.deauthorizationCallback({ companion, body, headers }) + res.status(status || 200).json(data) + return + } catch (err) { + const errResp = errorToResponse(err) + if (errResp) { + res.status(errResp.code).json({ message: errResp.message }) + return } - return res.status(status || 200).json(data) - }) + next(err) + } } module.exports = deauthCallback diff --git a/src/server/controllers/get.js b/src/server/controllers/get.js index 41dbb24c66..df11673151 100644 --- a/src/server/controllers/get.js +++ b/src/server/controllers/get.js @@ -1,47 +1,26 @@ -const Uploader = require('../Uploader') const logger = require('../logger') -const { errorToResponse } = require('../provider/error') +const { startDownUpload } = require('../helpers/upload') -function get (req, res, next) { +async function get (req, res) { const { id } = req.params const token = req.companion.providerToken const { provider } = req.companion - // get the file size before proceeding - provider.size({ id, token, query: req.query }, (err, size) => { - if (err) { - const errResp = errorToResponse(err) - if (errResp) { - return res.status(errResp.code).json({ message: errResp.message }) - } - return next(err) - } + async function getSize () { + return provider.size({ id, token, query: req.query }) + } - if (!size) { - logger.error('unable to determine file size', 'controller.get.provider.size', req.id) - return res.status(400).json({ message: 'unable to determine file size' }) - } + async function download () { + const { stream } = await provider.download({ id, token, query: req.query }) + return stream + } - logger.debug('Instantiating uploader.', null, req.id) - const uploader = new Uploader(Uploader.reqToOptions(req, size)) + function onUnhandledError (err) { + logger.error(err, 'controller.get.error', req.id) + res.status(400).json({ message: 'Failed to download file' }) + } - if (uploader.hasError()) { - const response = uploader.getResponse() - res.status(response.status).json(response.body) - return - } - - // wait till the client has connected to the socket, before starting - // the download, so that the client can receive all download/upload progress. - logger.debug('Waiting for socket connection before beginning remote download.', null, req.id) - // waiting for socketReady. - uploader.onSocketReady(() => { - logger.debug('Socket connection received. Starting remote download.', null, req.id) - provider.download({ id, token, query: req.query }, uploader.handleChunk.bind(uploader)) - }) - const response = uploader.getResponse() - res.status(response.status).json(response.body) - }) + startDownUpload({ req, res, getSize, download, onUnhandledError }) } module.exports = get diff --git a/src/server/controllers/list.js b/src/server/controllers/list.js index 2d096f6675..f539442548 100644 --- a/src/server/controllers/list.js +++ b/src/server/controllers/list.js @@ -1,18 +1,19 @@ const { errorToResponse } = require('../provider/error') -function list ({ query, params, companion }, res, next) { +async function list ({ query, params, companion }, res, next) { const token = companion.providerToken - companion.provider.list({ companion, token, directory: params.id, query }, (err, data) => { - if (err) { - const errResp = errorToResponse(err) - if (errResp) { - return res.status(errResp.code).json({ message: errResp.message }) - } - return next(err) + try { + const data = await companion.provider.list({ companion, token, directory: params.id, query }) + res.json(data) + } catch (err) { + const errResp = errorToResponse(err) + if (errResp) { + res.status(errResp.code).json({ message: errResp.message }) + return } - return res.json(data) - }) + next(err) + } } module.exports = list diff --git a/src/server/controllers/logout.js b/src/server/controllers/logout.js index 7a67b68dce..db9bf9a9bd 100644 --- a/src/server/controllers/logout.js +++ b/src/server/controllers/logout.js @@ -6,7 +6,7 @@ const { errorToResponse } = require('../provider/error') * @param {object} req * @param {object} res */ -function logout (req, res, next) { +async function logout (req, res, next) { const cleanSession = () => { if (req.session.grant) { req.session.grant.state = null @@ -16,24 +16,26 @@ function logout (req, res, next) { const { providerName } = req.params const { companion } = req const token = companion.providerTokens ? companion.providerTokens[providerName] : null - if (token) { - companion.provider.logout({ token, companion }, (err, data) => { - if (err) { - const errResp = errorToResponse(err) - if (errResp) { - return res.status(errResp.code).json({ message: errResp.message }) - } - return next(err) - } - delete companion.providerTokens[providerName] - tokenService.removeFromCookies(res, companion.options, companion.provider.authProvider) - cleanSession() - res.json({ ok: true, ...data }) - }) - } else { + if (!token) { cleanSession() res.json({ ok: true, revoked: false }) + return + } + + try { + const data = await companion.provider.logout({ token, companion }) + delete companion.providerTokens[providerName] + tokenService.removeFromCookies(res, companion.options, companion.provider.authProvider) + cleanSession() + res.json({ ok: true, ...data }) + } catch (err) { + const errResp = errorToResponse(err) + if (errResp) { + res.status(errResp.code).json({ message: errResp.message }) + return + } + next(err) } } diff --git a/src/server/controllers/thumbnail.js b/src/server/controllers/thumbnail.js index 7fa670b5cc..1fe4d95343 100644 --- a/src/server/controllers/thumbnail.js +++ b/src/server/controllers/thumbnail.js @@ -3,22 +3,23 @@ * @param {object} req * @param {object} res */ -function thumbnail (req, res, next) { +async function thumbnail (req, res, next) { const { providerName } = req.params const { id } = req.params const token = req.companion.providerTokens[providerName] const { provider } = req.companion - provider.thumbnail({ id, token }, (err, response) => { - if (err) { - if (err.isAuthError) res.sendStatus(401) - else next(err) - } else if (response) { + try { + const response = await provider.thumbnail({ id, token }) + if (response) { response.pipe(res) } else { res.sendStatus(404) } - }) + } catch (err) { + if (err.isAuthError) res.sendStatus(401) + else next(err) + } } module.exports = thumbnail diff --git a/src/server/controllers/url.js b/src/server/controllers/url.js index b5c33ee315..b7bc41c5fe 100644 --- a/src/server/controllers/url.js +++ b/src/server/controllers/url.js @@ -3,7 +3,7 @@ const request = require('request') const { URL } = require('url') const validator = require('validator') -const Uploader = require('../Uploader') +const { startDownUpload } = require('../helpers/upload') const { getURLMeta, getRedirectEvaluator, getProtectedHttpAgent } = require('../helpers/request') const logger = require('../logger') @@ -41,11 +41,11 @@ const validateURL = (url, debug) => { * to the callback chunk by chunk. * * @param {string} url - * @param {downloadCallback} onDataChunk * @param {boolean} blockLocalIPs * @param {string} traceId + * @returns {Promise} */ -const downloadURL = (url, onDataChunk, blockLocalIPs, traceId) => { +const downloadURL = async (url, blockLocalIPs, traceId) => { const opts = { uri: url, method: 'GET', @@ -53,20 +53,25 @@ const downloadURL = (url, onDataChunk, blockLocalIPs, traceId) => { agentClass: getProtectedHttpAgent((new URL(url)).protocol, blockLocalIPs), } - request(opts) - .on('response', (resp) => { - if (resp.statusCode >= 300) { - const err = new Error(`URL server responded with status: ${resp.statusCode}`) - onDataChunk(err, null) - } else { - resp.on('data', (chunk) => onDataChunk(null, chunk)) - } - }) - .on('end', () => onDataChunk(null, null)) - .on('error', (err) => { - logger.error(err, 'controller.url.download.error', traceId) - onDataChunk(err, null) - }) + return new Promise((resolve, reject) => { + const req = request(opts) + .on('response', (resp) => { + if (resp.statusCode >= 300) { + req.abort() // No need to keep request + reject(new Error(`URL server responded with status: ${resp.statusCode}`)) + return + } + + // Don't allow any more data to flow yet. + // https://github.com/request/request/issues/1990#issuecomment-184712275 + resp.pause() + resolve(resp) + }) + .on('error', (err) => { + logger.error(err, 'controller.url.download.error', traceId) + reject(err) + }) + }) } /** @@ -101,42 +106,30 @@ const meta = async (req, res) => { * @param {object} res expressJS response object */ const get = async (req, res) => { - try { - logger.debug('URL file import handler running', null, req.id) - const { debug } = req.companion.options - if (!validateURL(req.body.url, debug)) { - logger.debug('Invalid request body detected. Exiting url import handler.', null, req.id) - res.status(400).json({ error: 'Invalid request body' }) - return - } + logger.debug('URL file import handler running', null, req.id) + const { debug } = req.companion.options + if (!validateURL(req.body.url, debug)) { + logger.debug('Invalid request body detected. Exiting url import handler.', null, req.id) + res.status(400).json({ error: 'Invalid request body' }) + return + } + async function getSize () { const { size } = await getURLMeta(req.body.url, !debug) + return size + } - // @ts-ignore - logger.debug('Instantiating uploader.', null, req.id) - const uploader = new Uploader(Uploader.reqToOptions(req, size)) - - if (uploader.hasError()) { - const response = uploader.getResponse() - res.status(response.status).json(response.body) - return - } - - logger.debug('Waiting for socket connection before beginning remote download.', null, req.id) - uploader.onSocketReady(() => { - logger.debug('Socket connection received. Starting remote download.', null, req.id) - downloadURL(req.body.url, uploader.handleChunk.bind(uploader), !debug, req.id) - }) - - const response = uploader.getResponse() + async function download () { + return downloadURL(req.body.url, !debug, req.id) + } - // NOTE: Uploader will continue running after the http request is responded - res.status(response.status).json(response.body) - } catch (err) { - logger.error(err, 'controller.url.get.error', req.id) + function onUnhandledError (err) { + logger.error(err, 'controller.url.error', req.id) // @todo send more meaningful error message and status code to client if possible res.status(err.status || 500).json({ message: 'failed to fetch URL metadata' }) } + + startDownUpload({ req, res, getSize, download, onUnhandledError }) } module.exports = () => router() diff --git a/src/server/helpers/request.js b/src/server/helpers/request.js index da312cd962..aa0b83565c 100644 --- a/src/server/helpers/request.js +++ b/src/server/helpers/request.js @@ -187,9 +187,12 @@ exports.getURLMeta = (url, blockLocalIPs = false) => { reject(new Error(`URL server responded with status: ${response.statusCode}`)) } else { req.abort() // No need to get the rest of the response, as we only want header + + // Can be undefined for unknown length URLs, e.g. transfer-encoding: chunked + const contentLength = parseInt(response.headers['content-length'], 10) resolve({ type: response.headers['content-type'], - size: parseInt(response.headers['content-length'], 10), + size: Number.isNaN(contentLength) ? null : contentLength, }) } }) diff --git a/src/server/helpers/upload.js b/src/server/helpers/upload.js new file mode 100644 index 0000000000..5e2a45c162 --- /dev/null +++ b/src/server/helpers/upload.js @@ -0,0 +1,46 @@ +const Uploader = require('../Uploader') +const logger = require('../logger') +const { errorToResponse } = require('../provider/error') + +async function startDownUpload ({ req, res, getSize, download, onUnhandledError }) { + try { + const size = await getSize() + + logger.debug('Instantiating uploader.', null, req.id) + const uploader = new Uploader(Uploader.reqToOptions(req, size)) + + if (uploader.hasError()) { + const response = uploader.getResponse() + res.status(response.status).json(response.body) + return + } + + const stream = await download() + + // "Forking" off the upload operation to background, so we can return the http request: + ;(async () => { + // wait till the client has connected to the socket, before starting + // the download, so that the client can receive all download/upload progress. + logger.debug('Waiting for socket connection before beginning remote download/upload.', null, req.id) + await uploader.awaitReady() + logger.debug('Socket connection received. Starting remote download/upload.', null, req.id) + + await uploader.uploadStream(stream) + })().catch((err) => logger.error(err)) + + // Respond the request + // NOTE: Uploader will continue running after the http request is responded + const response = uploader.getResponse() + res.status(response.status).json(response.body) + } catch (err) { + const errResp = errorToResponse(err) + if (errResp) { + res.status(errResp.code).json({ message: errResp.message }) + return + } + + onUnhandledError(err) + } +} + +module.exports = { startDownUpload } diff --git a/src/server/helpers/utils.js b/src/server/helpers/utils.js index 12390ff33e..fe3f5daa2e 100644 --- a/src/server/helpers/utils.js +++ b/src/server/helpers/utils.js @@ -130,3 +130,26 @@ module.exports.decrypt = (encrypted, secret) => { decrypted += decipher.final('utf8') return decrypted } + +// This is a helper that will wait for the headers of a request, +// then it will pause the response, so that the stream is ready to be attached/piped in the uploader. +// If we don't pause it will lose some data. +module.exports.requestStream = async (req, convertResponseToError) => { + const resp = await new Promise((resolve, reject) => ( + req + .on('response', (response) => { + // Don't allow any more data to flow yet. + // https://github.com/request/request/issues/1990#issuecomment-184712275 + response.pause() + resolve(response) + }) + .on('error', reject) + )) + + if (resp.statusCode !== 200) { + req.abort() // Or we will leak memory (the stream is paused) + throw await convertResponseToError(resp) + } + + return { stream: resp } +} diff --git a/src/server/provider/Provider.js b/src/server/provider/Provider.js index ccf10516d0..58daf68e2d 100644 --- a/src/server/provider/Provider.js +++ b/src/server/provider/Provider.js @@ -22,9 +22,9 @@ class Provider { * list the files and folders in the provider account * * @param {object} options - * @param {Function} cb + * @returns {Promise} */ - list (options, cb) { // eslint-disable-line no-unused-vars + async list (options) { // eslint-disable-line no-unused-vars throw new Error('method not implemented') } @@ -32,9 +32,9 @@ class Provider { * download a certain file from the provider account * * @param {object} options - * @param {Function} cb + * @returns {Promise} */ - download (options, cb) { // eslint-disable-line no-unused-vars + async download (options) { // eslint-disable-line no-unused-vars throw new Error('method not implemented') } @@ -42,9 +42,9 @@ class Provider { * return a thumbnail for a provider file * * @param {object} options - * @param {Function} cb + * @returns {Promise} */ - thumbnail (options, cb) { // eslint-disable-line no-unused-vars + async thumbnail (options) { // eslint-disable-line no-unused-vars throw new Error('method not implemented') } @@ -52,9 +52,9 @@ class Provider { * get the size of a certain file in the provider account * * @param {object} options - * @param {Function} cb + * @returns {Promise} */ - size (options, cb) { // eslint-disable-line no-unused-vars + async size (options) { // eslint-disable-line no-unused-vars throw new Error('method not implemented') } @@ -62,9 +62,9 @@ class Provider { * handle deauthorization notification from oauth providers * * @param {object} options - * @param {Function} cb + * @returns {Promise} */ - deauthorizationCallback (options, cb) { // eslint-disable-line no-unused-vars + async deauthorizationCallback (options) { // eslint-disable-line no-unused-vars // @todo consider doing something like cb(new NotImplementedError()) instead throw new Error('method not implemented') } @@ -77,4 +77,6 @@ class Provider { } } +Provider.version = 1 + module.exports = Provider diff --git a/src/server/provider/ProviderCompat.js b/src/server/provider/ProviderCompat.js new file mode 100644 index 0000000000..c57532a24a --- /dev/null +++ b/src/server/provider/ProviderCompat.js @@ -0,0 +1,54 @@ +const { promisify } = require('util') + +const Stream = require('stream') + +/** + * Backward compatibility layer for old provider API using callbacks and onData cb + * + * @returns {any} + */ +const wrapLegacyProvider = (legacyProvider) => { + class CompatProvider extends legacyProvider { + constructor (...args) { + super(...args) + + this.list = promisify((options, cb) => super.list(options, cb)) + this.size = promisify((options, cb) => super.size(options, cb)) + this.thumbnail = promisify((options, cb) => super.thumbnail(options, cb)) + this.deauthorizationCallback = promisify((options, cb) => super.deauthorizationCallback(options, cb)) + this.logout = promisify((options, cb) => super.logout(options, cb)) + + const superDownload = super.download + + this.download = async (options) => { + let stream + + return new Promise((resolve, reject) => { + superDownload(options, (err, chunk) => { + if (err) { + if (stream && !stream.destroyed) stream.destroy(err) + reject(err) + return + } + + // Initialize on first chunk + if (chunk != null && !stream) { + stream = new Stream.PassThrough() + // stream.on('end', () => console.log('stream end')) + stream.pause() + stream.push(chunk) + resolve({ stream }) + return + } + + stream.push(chunk) + }) + }) + } + } + } + + return CompatProvider +} + +module.exports = { wrapLegacyProvider } diff --git a/src/server/provider/SearchProvider.js b/src/server/provider/SearchProvider.js index b2ea5e576f..28b78b7752 100644 --- a/src/server/provider/SearchProvider.js +++ b/src/server/provider/SearchProvider.js @@ -6,9 +6,9 @@ class SearchProvider { * list the files available based on the search query * * @param {object} options - * @param {Function} cb + * @returns {Promise} */ - list (options, cb) { // eslint-disable-line no-unused-vars + async list (options) { // eslint-disable-line no-unused-vars throw new Error('method not implemented') } @@ -16,9 +16,9 @@ class SearchProvider { * download a certain file from the provider files * * @param {object} options - * @param {Function} cb + * @returns {Promise} */ - download (options, cb) { // eslint-disable-line no-unused-vars + async download (options) { // eslint-disable-line no-unused-vars throw new Error('method not implemented') } @@ -26,11 +26,13 @@ class SearchProvider { * get the size of a certain file in the provider files * * @param {object} options - * @param {Function} cb + * @returns {Promise} */ - size (options, cb) { // eslint-disable-line no-unused-vars + async size (options) { // eslint-disable-line no-unused-vars throw new Error('method not implemented') } } +SearchProvider.version = 1 + module.exports = SearchProvider diff --git a/src/server/provider/box/index.js b/src/server/provider/box/index.js index 2d5b4c7c7f..8edd2dc37b 100644 --- a/src/server/provider/box/index.js +++ b/src/server/provider/box/index.js @@ -1,10 +1,12 @@ -const Provider = require('../Provider') - const request = require('request') const purest = require('purest')({ request }) +const { promisify } = require('util') + +const Provider = require('../Provider') const logger = require('../../logger') const adapter = require('./adapter') const { ProviderApiError, ProviderAuthError } = require('../error') +const { requestStream } = require('../../helpers/utils') const BOX_FILES_FIELDS = 'id,modified_at,name,permissions,size,type' const BOX_THUMBNAIL_SIZE = 256 @@ -41,7 +43,7 @@ class Box extends Provider { * @param {object} options * @param {Function} done */ - list ({ directory, token, query, companion }, done) { + _list ({ directory, token, query, companion }, done) { const rootFolderID = '0' const path = `folders/${directory || rootFolderID}/items` @@ -66,26 +68,20 @@ class Box extends Provider { }) } - download ({ id, token }, onData) { - return this.client - .get(`files/${id}/content`) - .auth(token) - .request() - .on('response', (resp) => { - if (resp.statusCode !== 200) { - onData(this._error(null, resp)) - } else { - resp.on('data', (chunk) => onData(null, chunk)) - } - }) - .on('end', () => onData(null, null)) - .on('error', (err) => { - logger.error(err, 'provider.box.download.error') - onData(err) - }) + async download ({ id, token }) { + try { + const req = this.client + .get(`files/${id}/content`) + .auth(token) + .request() + return await requestStream(req, async (res) => this._error(null, res)) + } catch (err) { + logger.error(err, 'provider.box.download.error') + throw err + } } - thumbnail ({ id, token }, done) { + _thumbnail ({ id, token }, done) { return this.client .get(`files/${id}/thumbnail.png`) .qs({ max_height: BOX_THUMBNAIL_SIZE, max_width: BOX_THUMBNAIL_SIZE }) @@ -120,7 +116,7 @@ class Box extends Provider { }) } - size ({ id, token }, done) { + _size ({ id, token }, done) { return this.client .get(`files/${id}`) .auth(token) @@ -130,11 +126,11 @@ class Box extends Provider { logger.error(err, 'provider.box.size.error') return done(err) } - done(null, parseInt(body.size)) + done(null, parseInt(body.size, 10)) }) } - logout ({ companion, token }, done) { + _logout ({ companion, token }, done) { const { key, secret } = companion.options.providerOptions.box return this.client @@ -190,4 +186,11 @@ class Box extends Provider { } } +Box.version = 2 + +Box.prototype.list = promisify(Box.prototype._list) +Box.prototype.thumbnail = promisify(Box.prototype._thumbnail) +Box.prototype.size = promisify(Box.prototype._size) +Box.prototype.logout = promisify(Box.prototype._logout) + module.exports = Box diff --git a/src/server/provider/drive/index.js b/src/server/provider/drive/index.js index ef8d8231cc..fc412d36da 100644 --- a/src/server/provider/drive/index.js +++ b/src/server/provider/drive/index.js @@ -1,12 +1,13 @@ /* eslint-disable no-underscore-dangle */ -const { callbackify } = require('util') const request = require('request') const purest = require('purest')({ request }) +const { promisify } = require('util') const Provider = require('../Provider') const logger = require('../../logger') const adapter = require('./adapter') const { ProviderApiError, ProviderAuthError } = require('../error') +const { requestStream } = require('../../helpers/utils') const DRIVE_FILE_FIELDS = 'kind,id,imageMediaMetadata,name,mimeType,ownedByMe,permissions(role,emailAddress),size,modifiedTime,iconLink,thumbnailLink,teamDriveId,videoMediaMetadata,shortcutDetails(targetId,targetMimeType)' const DRIVE_FILES_FIELDS = `kind,nextPageToken,incompleteSearch,files(${DRIVE_FILE_FIELDS})` @@ -20,19 +21,15 @@ function sortByName (first, second) { return first.name.localeCompare(second.name) } -function waitForFailedResponse (resp) { - return new Promise((resolve, reject) => { +async function waitForFailedResponse (resp) { + const buf = await new Promise((resolve) => { let data = '' resp.on('data', (chunk) => { data += chunk - }).on('end', () => { - try { - resolve(JSON.parse(data.toString())) - } catch (error) { - reject(error) - } - }) + }).on('end', () => resolve(data)) + resp.resume() }) + return JSON.parse(buf.toString()) } function adaptData (listFilesResp, sharedDrivesResp, directory, query, showSharedWithMe) { @@ -104,7 +101,7 @@ class Drive extends Provider { return 'google' } - async _list (options) { + async list (options) { const directory = options.directory || 'root' const query = options.query || {} @@ -180,12 +177,7 @@ class Drive extends Provider { ) } - list (options, done) { - // @ts-ignore - callbackify(this._list.bind(this))(options, done) - } - - async stats ({ id, token }) { + async _stats ({ id, token }) { const getStats = async (statsOfId) => new Promise((resolve, reject) => { this.client .query() @@ -217,79 +209,59 @@ class Drive extends Provider { .request() } - download ({ id: idIn, token }, onData) { - this.stats({ id: idIn, token }) - .then(({ mimeType, id }) => { - let requestStream - if (adapter.isGsuiteFile(mimeType)) { - requestStream = this._exportGsuiteFile(id, token, adapter.getGsuiteExportType(mimeType)) - } else { - requestStream = this.client - .query() - .get(`files/${encodeURIComponent(id)}`) - .qs({ alt: 'media', supportsAllDrives: true }) - .auth(token) - .request() - } + async download ({ id: idIn, token }) { + try { + const { mimeType, id } = await this._stats({ id: idIn, token }) - requestStream - .on('response', (resp) => { - if (resp.statusCode !== 200) { - waitForFailedResponse(resp) - .then((jsonResp) => { - onData(this._error(null, { ...resp, body: jsonResp })) - }) - .catch((err2) => onData(this._error(err2, resp))) - } else { - resp.on('data', (chunk) => onData(null, chunk)) - } - }) - .on('end', () => onData(null, null)) - .on('error', (err2) => { - logger.error(err2, 'provider.drive.download.error') - onData(err2) - }) - }) - .catch((err) => { - logger.error(err, 'provider.drive.download.stats.error') - onData(err) + const req = adapter.isGsuiteFile(mimeType) + ? this._exportGsuiteFile(id, token, adapter.getGsuiteExportType(mimeType)) + : this.client + .query() + .get(`files/${encodeURIComponent(id)}`) + .qs({ alt: 'media', supportsAllDrives: true }) + .auth(token) + .request() + + return await requestStream(req, async (res) => { + try { + const jsonResp = await waitForFailedResponse(res) + return this._error(null, { ...res, body: jsonResp }) + } catch (err2) { + return this._error(err2, res) + } }) + } catch (err) { + logger.error(err, 'provider.drive.download.error') + throw err + } } // eslint-disable-next-line class-methods-use-this - thumbnail (_, done) { + async thumbnail () { // not implementing this because a public thumbnail from googledrive will be used instead - const err = new Error('call to thumbnail is not implemented') - logger.error(err, 'provider.drive.thumbnail.error') - return done(err) + logger.error('call to thumbnail is not implemented', 'provider.drive.thumbnail.error') + throw new Error('call to thumbnail is not implemented') } - async _size ({ id, token }) { + async size ({ id, token }) { try { - const body = await this.stats({ id, token }) - - if (adapter.isGsuiteFile(body.mimeType)) { - // Not all GSuite file sizes can be predetermined - // also for files whose size can be predetermined, - // the request to get it can be sometimes expesnive, depending - // on the file size. So we default the size to the size export limit - const maxExportFileSize = 10 * 1024 * 1024 // 10 MB - return maxExportFileSize + const { mimeType, size } = await this._stats({ id, token }) + + if (adapter.isGsuiteFile(mimeType)) { + // GSuite file sizes cannot be predetermined (but are max 10MB) + // e.g. Transfer-Encoding: chunked + return undefined } - return parseInt(body.size, 10) + + return parseInt(size, 10) } catch (err) { logger.error(err, 'provider.drive.size.error') throw err } } - size (options, done) { - // @ts-ignore - callbackify(this._size.bind(this))(options, done) - } - - logout ({ token }, done) { - return this.client + _logout ({ token }, done) { + this.client .get('https://accounts.google.com/o/oauth2/revoke') .qs({ token }) .request((err, resp) => { @@ -312,4 +284,8 @@ class Drive extends Provider { } } +Drive.version = 2 + +Drive.prototype.logout = promisify(Drive.prototype._logout) + module.exports = Drive diff --git a/src/server/provider/dropbox/index.js b/src/server/provider/dropbox/index.js index 051ff7914f..5fea7f00a7 100644 --- a/src/server/provider/dropbox/index.js +++ b/src/server/provider/dropbox/index.js @@ -1,10 +1,12 @@ -const Provider = require('../Provider') - const request = require('request') const purest = require('purest')({ request }) +const { promisify } = require('util') + +const Provider = require('../Provider') const logger = require('../../logger') const adapter = require('./adapter') const { ProviderApiError, ProviderAuthError } = require('../error') +const { requestStream } = require('../../helpers/utils') // From https://www.dropbox.com/developers/reference/json-encoding: // @@ -52,7 +54,7 @@ class DropBox extends Provider { * @param {object} options * @param {Function} done */ - list (options, done) { + _list (options, done) { let userInfoDone = false let statsDone = false let userInfo @@ -119,32 +121,26 @@ class DropBox extends Provider { .request(done) } - download ({ id, token }, onData) { - return this.client - .post('https://content.dropboxapi.com/2/files/download') - .options({ - version: '2', - headers: { - 'Dropbox-API-Arg': httpHeaderSafeJson({ path: `${id}` }), - }, - }) - .auth(token) - .request() - .on('response', (resp) => { - if (resp.statusCode !== 200) { - onData(this._error(null, resp)) - } else { - resp.on('data', (chunk) => onData(null, chunk)) - } - }) - .on('end', () => onData(null, null)) - .on('error', (err) => { - logger.error(err, 'provider.dropbox.download.error') - onData(err) - }) + async download ({ id, token }) { + try { + const req = this.client + .post('https://content.dropboxapi.com/2/files/download') + .options({ + version: '2', + headers: { + 'Dropbox-API-Arg': httpHeaderSafeJson({ path: `${id}` }), + }, + }) + .auth(token) + + return await requestStream(req, async (res) => this._error(null, res)) + } catch (err) { + logger.error(err, 'provider.dropbox.download.error') + throw err + } } - thumbnail ({ id, token }, done) { + _thumbnail ({ id, token }, done) { return this.client .post('https://content.dropboxapi.com/2/files/get_thumbnail') .options({ @@ -168,7 +164,7 @@ class DropBox extends Provider { }) } - size ({ id, token }, done) { + _size ({ id, token }, done) { return this.client .post('files/get_metadata') .options({ version: '2' }) @@ -180,11 +176,11 @@ class DropBox extends Provider { logger.error(err, 'provider.dropbox.size.error') return done(err) } - done(null, parseInt(body.size)) + done(null, parseInt(body.size, 10)) }) } - logout ({ token }, done) { + _logout ({ token }, done) { return this.client .post('auth/token/revoke') .options({ version: '2' }) @@ -232,4 +228,11 @@ class DropBox extends Provider { } } +DropBox.version = 2 + +DropBox.prototype.list = promisify(DropBox.prototype._list) +DropBox.prototype.thumbnail = promisify(DropBox.prototype._thumbnail) +DropBox.prototype.size = promisify(DropBox.prototype._size) +DropBox.prototype.logout = promisify(DropBox.prototype._logout) + module.exports = DropBox diff --git a/src/server/provider/error.js b/src/server/provider/error.js index 1a2ff6ac7c..5d053ab5d9 100644 --- a/src/server/provider/error.js +++ b/src/server/provider/error.js @@ -48,6 +48,8 @@ function errorToResponse (err) { return { code: 424, message: err.message } } } + + return undefined } module.exports = { ProviderAuthError, ProviderApiError, errorToResponse } diff --git a/src/server/provider/facebook/index.js b/src/server/provider/facebook/index.js index 6552748854..6e684dbdbc 100644 --- a/src/server/provider/facebook/index.js +++ b/src/server/provider/facebook/index.js @@ -1,11 +1,13 @@ -const Provider = require('../Provider') - const request = require('request') const purest = require('purest')({ request }) +const { promisify } = require('util') + +const Provider = require('../Provider') const { getURLMeta } = require('../../helpers/request') const logger = require('../../logger') const adapter = require('./adapter') const { ProviderApiError, ProviderAuthError } = require('../error') +const { requestStream } = require('../../helpers/utils') /** * Adapter for API https://developers.facebook.com/docs/graph-api/using-graph-api/ @@ -24,7 +26,7 @@ class Facebook extends Provider { return 'facebook' } - list ({ directory, token, query = { cursor: null } }, done) { + _list ({ directory, token, query = { cursor: null } }, done) { const qs = { fields: 'name,cover_photo,created_time,type', } @@ -79,43 +81,41 @@ class Facebook extends Provider { return sortedImages[sortedImages.length - 1].source } - download ({ id, token }, onData) { - return this.client - .get(`https://graph.facebook.com/${id}`) - .qs({ fields: 'images' }) - .auth(token) - .request((err, resp, body) => { - if (err || resp.statusCode !== 200) { - err = this._error(err, resp) - logger.error(err, 'provider.facebook.download.error') - onData(err) - return - } - - request(this._getMediaUrl(body)) - .on('response', (resp) => { - if (resp.statusCode !== 200) { - onData(this._error(null, resp)) - } else { - resp.on('data', (chunk) => onData(null, chunk)) + async download ({ id, token }) { + try { + const body1 = await new Promise((resolve, reject) => ( + this.client + .get(`https://graph.facebook.com/${id}`) + .qs({ fields: 'images' }) + .auth(token) + .request((err, resp, body) => { + if (err || resp.statusCode !== 200) { + err = this._error(err, resp) + logger.error(err, 'provider.facebook.download.error') + reject(err) + return } + resolve(body) }) - .on('end', () => onData(null, null)) - .on('error', (err) => { - logger.error(err, 'provider.facebook.download.url.error') - onData(err) - }) - }) + )) + + const url = this._getMediaUrl(body1) + const req = request(url) + return await requestStream(req, async (res) => this._error(null, res)) + } catch (err) { + logger.error(err, 'provider.facebook.download.url.error') + throw err + } } - thumbnail (_, done) { + // eslint-disable-next-line class-methods-use-this + async thumbnail () { // not implementing this because a public thumbnail from facebook will be used instead - const err = new Error('call to thumbnail is not implemented') - logger.error(err, 'provider.facebook.thumbnail.error') - return done(err) + logger.error('call to thumbnail is not implemented', 'provider.facebook.thumbnail.error') + throw new Error('call to thumbnail is not implemented') } - size ({ id, token }, done) { + _size ({ id, token }, done) { return this.client .get(`https://graph.facebook.com/${id}`) .qs({ fields: 'images' }) @@ -129,14 +129,14 @@ class Facebook extends Provider { getURLMeta(this._getMediaUrl(body)) .then(({ size }) => done(null, size)) - .catch((err) => { - logger.error(err, 'provider.facebook.size.error') - done() + .catch((err2) => { + logger.error(err2, 'provider.facebook.size.error') + done(err2) }) }) } - logout ({ token }, done) { + _logout ({ token }, done) { return this.client .delete('me/permissions') .auth(token) @@ -187,4 +187,10 @@ class Facebook extends Provider { } } +Facebook.version = 2 + +Facebook.prototype.list = promisify(Facebook.prototype._list) +Facebook.prototype.size = promisify(Facebook.prototype._size) +Facebook.prototype.logout = promisify(Facebook.prototype._logout) + module.exports = Facebook diff --git a/src/server/provider/index.js b/src/server/provider/index.js index 8cf9d66209..3c9221955b 100644 --- a/src/server/provider/index.js +++ b/src/server/provider/index.js @@ -2,7 +2,7 @@ * @module provider */ // @ts-ignore -const config = require('@purest/providers') +const purestConfig = require('@purest/providers') const dropbox = require('./dropbox') const box = require('./box') const drive = require('./drive') @@ -18,9 +18,10 @@ const { getCredentialsResolver } = require('./credentials') const Provider = require('./Provider') // eslint-disable-next-line const SearchProvider = require('./SearchProvider') +const { wrapLegacyProvider } = require('./ProviderCompat') // leave here for now until Purest Providers gets updated with Zoom provider -config.zoom = { +purestConfig.zoom = { 'https://zoom.us/': { __domain: { auth: { @@ -44,6 +45,25 @@ config.zoom = { }, } +/** + * + * @param {{server: object}} options + */ +const validOptions = (options) => { + return options.server.host && options.server.protocol +} + +/** + * + * @param {string} name of the provider + * @param {{server: object, providerOptions: object}} options + * @returns {string} the authProvider for this provider + */ +const providerNameToAuthName = (name, options) => { // eslint-disable-line no-unused-vars + const providers = exports.getDefaultProviders() + return (providers[name] || {}).authProvider +} + /** * adds the desired provider module to the request object, * based on the providerName parameter specified @@ -60,8 +80,14 @@ module.exports.getProviderMiddleware = (providers, needsProviderCredentials) => * @param {string} providerName */ const middleware = (req, res, next, providerName) => { - if (providers[providerName] && validOptions(req.companion.options)) { - req.companion.provider = new providers[providerName]({ providerName, config }) + let ProviderClass = providers[providerName] + if (ProviderClass && validOptions(req.companion.options)) { + // TODO remove this legacy provider compatibility when we release a new major + // @ts-ignore + if (ProviderClass.version !== 2) ProviderClass = wrapLegacyProvider(ProviderClass) + + req.companion.provider = new ProviderClass({ providerName, config: purestConfig }) + if (needsProviderCredentials) { req.companion.getProviderCredentials = getCredentialsResolver(providerName, req.companion.options, req) } @@ -100,14 +126,17 @@ module.exports.getSearchProviders = () => { */ module.exports.addCustomProviders = (customProviders, providers, grantConfig) => { Object.keys(customProviders).forEach((providerName) => { - providers[providerName] = customProviders[providerName].module - const providerConfig = { ...customProviders[providerName].config } - // todo: consider setting these options from a universal point also used - // by official providers. It'll prevent these from getting left out if the - // requirement changes. - providerConfig.callback = `/${providerName}/callback` - providerConfig.transport = 'session' - grantConfig[providerName] = providerConfig + const customProvider = customProviders[providerName] + + providers[providerName] = customProvider.module + grantConfig[providerName] = { + ...customProvider.config, + // todo: consider setting these options from a universal point also used + // by official providers. It'll prevent these from getting left out if the + // requirement changes. + callback: `/${providerName}/callback`, + transport: 'session', + } }) } @@ -164,22 +193,3 @@ module.exports.addProviderOptions = (companionOptions, grantConfig) => { } }) } - -/** - * - * @param {string} name of the provider - * @param {{server: object, providerOptions: object}} options - * @returns {string} the authProvider for this provider - */ -const providerNameToAuthName = (name, options) => { // eslint-disable-line no-unused-vars - const providers = exports.getDefaultProviders() - return (providers[name] || {}).authProvider -} - -/** - * - * @param {{server: object}} options - */ -const validOptions = (options) => { - return options.server.host && options.server.protocol -} diff --git a/src/server/provider/instagram/graph/index.js b/src/server/provider/instagram/graph/index.js index 22d06c1eb9..79b380d5cd 100644 --- a/src/server/provider/instagram/graph/index.js +++ b/src/server/provider/instagram/graph/index.js @@ -1,11 +1,13 @@ -const Provider = require('../../Provider') - const request = require('request') const purest = require('purest')({ request }) +const { promisify } = require('util') + +const Provider = require('../../Provider') const { getURLMeta } = require('../../../helpers/request') const logger = require('../../../logger') const adapter = require('./adapter') const { ProviderApiError, ProviderAuthError } = require('../../error') +const { requestStream } = require('../../../helpers/utils') /** * Adapter for API https://developers.facebook.com/docs/instagram-api/overview @@ -31,7 +33,7 @@ class Instagram extends Provider { return 'instagram' } - list ({ directory, token, query = { cursor: null } }, done) { + _list ({ directory, token, query = { cursor: null } }, done) { const qs = { fields: 'id,media_type,thumbnail_url,media_url,timestamp,children{media_type,media_url,thumbnail_url,timestamp}', } @@ -72,43 +74,39 @@ class Instagram extends Provider { }) } - download ({ id, token }, onData) { - return this.client - .get(`https://graph.instagram.com/${id}`) - .qs({ fields: 'media_url' }) - .auth(token) - .request((err, resp, body) => { - if (err || resp.statusCode !== 200) { - err = this._error(err, resp) - logger.error(err, 'provider.instagram.download.error') - onData(err) - return - } - - request(body.media_url) - .on('response', (resp) => { - if (resp.statusCode !== 200) { - onData(this._error(null, resp)) - } else { - resp.on('data', (chunk) => onData(null, chunk)) + async download ({ id, token }) { + try { + const body1 = await new Promise((resolve, reject) => ( + this.client + .get(`https://graph.instagram.com/${id}`) + .qs({ fields: 'media_url' }) + .auth(token) + .request((err, resp, body) => { + if (err || resp.statusCode !== 200) { + err = this._error(err, resp) + logger.error(err, 'provider.instagram.download.error') + reject(err) + return } + resolve(body) }) - .on('end', () => onData(null, null)) - .on('error', (err) => { - logger.error(err, 'provider.instagram.download.url.error') - onData(err) - }) - }) + )) + + const req = request(body1.media_url) + return await requestStream(req, async (res) => this._error(null, res)) + } catch (err) { + logger.error(err, 'provider.instagram.download.url.error') + throw err + } } - thumbnail (_, done) { + async thumbnail () { // not implementing this because a public thumbnail from instagram will be used instead - const err = new Error('call to thumbnail is not implemented') - logger.error(err, 'provider.instagram.thumbnail.error') - return done(err) + logger.error('call to thumbnail is not implemented', 'provider.instagram.thumbnail.error') + throw new Error('call to thumbnail is not implemented') } - size ({ id, token }, done) { + _size ({ id, token }, done) { return this.client .get(`https://graph.instagram.com/${id}`) .qs({ fields: 'media_url' }) @@ -122,16 +120,16 @@ class Instagram extends Provider { getURLMeta(body.media_url) .then(({ size }) => done(null, size)) - .catch((err) => { - logger.error(err, 'provider.instagram.size.error') - done() + .catch((err2) => { + logger.error(err2, 'provider.instagram.size.error') + done(err2) }) }) } - logout (_, done) { + async logout () { // access revoke is not supported by Instagram's API - done(null, { revoked: false, manual_revoke_url: 'https://www.instagram.com/accounts/manage_access/' }) + return { revoked: false, manual_revoke_url: 'https://www.instagram.com/accounts/manage_access/' } } adaptData (res, username, directory, currentQuery) { @@ -171,4 +169,9 @@ class Instagram extends Provider { } } +Instagram.version = 2 + +Instagram.prototype.list = promisify(Instagram.prototype._list) +Instagram.prototype.size = promisify(Instagram.prototype._size) + module.exports = Instagram diff --git a/src/server/provider/onedrive/index.js b/src/server/provider/onedrive/index.js index 69b32ec740..b7ee7a62ae 100644 --- a/src/server/provider/onedrive/index.js +++ b/src/server/provider/onedrive/index.js @@ -1,10 +1,12 @@ -const Provider = require('../Provider') - const request = require('request') const purest = require('purest')({ request }) +const { promisify } = require('util') + +const Provider = require('../Provider') const logger = require('../../logger') const adapter = require('./adapter') const { ProviderApiError, ProviderAuthError } = require('../error') +const { requestStream } = require('../../helpers/utils') /** * Adapter for API https://docs.microsoft.com/en-us/onedrive/developer/rest-api/ @@ -37,7 +39,7 @@ class OneDrive extends Provider { * @param {object} options * @param {Function} done */ - list ({ directory, query, token }, done) { + _list ({ directory, query, token }, done) { const path = directory ? `items/${directory}` : 'root' const rootPath = query.driveId ? `/drives/${query.driveId}` : '/me/drive' const qs = { $expand: 'thumbnails' } @@ -66,34 +68,29 @@ class OneDrive extends Provider { }) } - download ({ id, token, query }, onData) { - const rootPath = query.driveId ? `/drives/${query.driveId}` : '/me/drive' - return this.client - .get(`${rootPath}/items/${id}/content`) - .auth(token) - .request() - .on('response', (resp) => { - if (resp.statusCode !== 200) { - onData(this._error(null, resp)) - } else { - resp.on('data', (chunk) => onData(null, chunk)) - } - }) - .on('end', () => onData(null, null)) - .on('error', (err) => { - logger.error(err, 'provider.onedrive.download.error') - onData(err) - }) + async download ({ id, token, query }) { + try { + const rootPath = query.driveId ? `/drives/${query.driveId}` : '/me/drive' + + const req = this.client + .get(`${rootPath}/items/${id}/content`) + .auth(token) + .request() + + return await requestStream(req, async (res) => this._error(null, res)) + } catch (err) { + logger.error(err, 'provider.onedrive.download.error') + throw err + } } - thumbnail (_, done) { + async thumbnail () { // not implementing this because a public thumbnail from onedrive will be used instead - const err = new Error('call to thumbnail is not implemented') - logger.error(err, 'provider.onedrive.thumbnail.error') - return done(err) + logger.error('call to thumbnail is not implemented', 'provider.onedrive.thumbnail.error') + throw new Error('call to thumbnail is not implemented') } - size ({ id, query, token }, done) { + _size ({ id, query, token }, done) { const rootPath = query.driveId ? `/drives/${query.driveId}` : '/me/drive' return this.client .get(`${rootPath}/items/${id}`) @@ -108,9 +105,8 @@ class OneDrive extends Provider { }) } - logout (_, done) { - // access revoke is not supported by Microsoft/OneDrive's API - done(null, { revoked: false, manual_revoke_url: 'https://account.live.com/consent/Manage' }) + async logout () { + return { revoked: false, manual_revoke_url: 'https://account.live.com/consent/Manage' } } adaptData (res, username) { @@ -146,4 +142,9 @@ class OneDrive extends Provider { } } +OneDrive.version = 2 + +OneDrive.prototype.list = promisify(OneDrive.prototype._list) +OneDrive.prototype.size = promisify(OneDrive.prototype._size) + module.exports = OneDrive diff --git a/src/server/provider/unsplash/index.js b/src/server/provider/unsplash/index.js index 4cf2085df9..424f85e0ca 100644 --- a/src/server/provider/unsplash/index.js +++ b/src/server/provider/unsplash/index.js @@ -1,9 +1,12 @@ const request = require('request') +const { promisify } = require('util') + const SearchProvider = require('../SearchProvider') const { getURLMeta } = require('../../helpers/request') const logger = require('../../logger') const adapter = require('./adapter') const { ProviderApiError } = require('../error') +const { requestStream } = require('../../helpers/utils') const BASE_URL = 'https://api.unsplash.com' @@ -38,7 +41,7 @@ function adaptData (body, currentQuery) { * Adapter for API https://api.unsplash.com */ class Unsplash extends SearchProvider { - list ({ token, query = { cursor: null, q: null } }, done) { + _list ({ token, query = { cursor: null, q: null } }, done) { const reqOpts = { url: `${BASE_URL}/search/photos`, method: 'GET', @@ -66,47 +69,49 @@ class Unsplash extends SearchProvider { }) } - download ({ id, token }, onData) { - const reqOpts = { - url: `${BASE_URL}/photos/${id}`, - method: 'GET', - json: true, - headers: { - Authorization: `Client-ID ${token}`, - }, - } - request(reqOpts, (err, resp, body) => { - if (err || resp.statusCode !== 200) { - const error = this.error(err, resp) - logger.error(error, 'provider.unsplash.download.error') - onData(error) - return + async download ({ id, token }) { + try { + const reqOpts = { + method: 'GET', + json: true, + headers: { + Authorization: `Client-ID ${token}`, + }, } - const url = body.links.download - - request - .get(url) - .on('response', (response) => { - if (response.statusCode !== 200) { - onData(this.error(null, response)) - } else { - response.on('data', (chunk) => onData(null, chunk)) + const body = await new Promise((resolve, reject) => ( + request({ ...reqOpts, url: `${BASE_URL}/photos/${id}` }, (err, resp, body2) => { + if (err || resp.statusCode !== 200) { + const err2 = this.error(err, resp) + logger.error(err, 'provider.unsplash.download.error') + reject(err2) + return } + resolve(body2) }) - .on('end', () => onData(null, null)) - // To attribute the author of the image, we call the `download_location` - // endpoint to increment the download count on Unsplash. - // https://help.unsplash.com/en/articles/2511258-guideline-triggering-a-download - .on('complete', () => request({ ...reqOpts, url: body.links.download_location })) - .on('error', (error) => { - logger.error(error, 'provider.unsplash.download.url.error') - onData(error) - }) - }) + )) + + const req = request.get(body.links.download) + const stream = await requestStream(req, async (res) => this.error(null, res)) + + // To attribute the author of the image, we call the `download_location` + // endpoint to increment the download count on Unsplash. + // https://help.unsplash.com/en/articles/2511258-guideline-triggering-a-download + request({ ...reqOpts, url: body.links.download_location }, (err, resp) => { + if (err || resp.statusCode !== 200) { + const err2 = this.error(err, resp) + logger.error(err2, 'provider.unsplash.download.location.error') + } + }) + + return stream + } catch (err) { + logger.error(err, 'provider.unsplash.download.url.error') + throw err + } } - size ({ id, token }, done) { + _size ({ id, token }, done) { const reqOpts = { url: `${BASE_URL}/photos/${id}`, method: 'GET', @@ -126,9 +131,9 @@ class Unsplash extends SearchProvider { getURLMeta(body.links.download) .then(({ size }) => done(null, size)) - .catch((error) => { - logger.error(error, 'provider.unsplash.size.error') - done() + .catch((err2) => { + logger.error(err2, 'provider.unsplash.size.error') + done(err2) }) }) } @@ -146,4 +151,9 @@ class Unsplash extends SearchProvider { } } +Unsplash.version = 2 + +Unsplash.prototype.list = promisify(Unsplash.prototype._list) +Unsplash.prototype.size = promisify(Unsplash.prototype._size) + module.exports = Unsplash diff --git a/src/server/provider/zoom/index.js b/src/server/provider/zoom/index.js index ce6fffbffe..cd12a1945f 100644 --- a/src/server/provider/zoom/index.js +++ b/src/server/provider/zoom/index.js @@ -1,11 +1,13 @@ -const Provider = require('../Provider') - +const { promisify } = require('util') const request = require('request') const moment = require('moment-timezone') const purest = require('purest')({ request }) + +const Provider = require('../Provider') const logger = require('../../logger') const adapter = require('./adapter') const { ProviderApiError, ProviderAuthError } = require('../error') +const { requestStream } = require('../../helpers/utils') const BASE_URL = 'https://zoom.us/v2' const GET_LIST_PATH = '/users/me/recordings' @@ -31,7 +33,7 @@ class Zoom extends Provider { return 'zoom' } - list (options, done) { + _list (options, done) { /* - returns list of months by default - drill down for specific files in each month @@ -102,54 +104,50 @@ class Zoom extends Provider { .request(done) } - download ({ id, token, query }, done) { - // meeting id + file id required - // cc files don't have an ID or size - const meetingId = id - const fileId = query.recordingId - const { recordingStart } = query - const GET_MEETING_FILES = `/meetings/${encodeURIComponent(meetingId)}/recordings` + async download ({ id, token, query }) { + try { + // meeting id + file id required + // cc files don't have an ID or size + const meetingId = id + const fileId = query.recordingId + const { recordingStart } = query + const GET_MEETING_FILES = `/meetings/${encodeURIComponent(meetingId)}/recordings` + + const downloadUrl = await new Promise((resolve, reject) => { + this.client + .get(`${BASE_URL}${GET_MEETING_FILES}`) + .auth(token) + .request((err, resp) => { + if (err || resp.statusCode !== 200) { + const error = this._error(null, resp) + reject(error) + return + } + const file = resp + .body + .recording_files + .find(file => fileId === file.id || (file.file_type === fileId && file.recording_start === recordingStart)) + if (!file || !file.download_url) { + const error = this._error(null, resp) + reject(error) + return + } + resolve(file.download_url) + }) + }) - const downloadUrlPromise = new Promise((resolve) => { - this.client - .get(`${BASE_URL}${GET_MEETING_FILES}`) - .auth(token) - .request((err, resp) => { - if (err || resp.statusCode !== 200) { - return this._downloadError(resp, done) - } - const file = resp - .body - .recording_files - .find(file => fileId === file.id || (file.file_type === fileId && file.recording_start === recordingStart)) - if (!file || !file.download_url) { - return this._downloadError(resp, done) - } - resolve(file.download_url) - }) - }) - downloadUrlPromise.then((downloadUrl) => { - this.client + const req = this.client .get(`${downloadUrl}?access_token=${token}`) .request() - .on('response', (resp) => { - if (resp.statusCode !== 200) { - done(this._error(null, resp)) - } else { - resp.on('data', (chunk) => done(null, chunk)) - } - }) - .on('end', () => { - done(null, null) - }) - .on('error', (err) => { - logger.error(err, 'provider.zoom.download.error') - done(err) - }) - }) + + return await requestStream(req, async (res) => this._error(null, res)) + } catch (err) { + logger.error(err, 'provider.zoom.download.error') + throw err + } } - size ({ id, token, query }, done) { + _size ({ id, token, query }, done) { const meetingId = id const fileId = query.recordingId const { recordingStart } = query @@ -170,8 +168,7 @@ class Zoom extends Provider { if (!file) { return this._downloadError(resp, done) } - const maxExportFileSize = 10 * 1024 * 1024 // 10MB - done(null, file.file_size || maxExportFileSize) + done(null, file.file_size) // May be undefined. }) } @@ -254,7 +251,7 @@ class Zoom extends Provider { return data } - logout ({ companion, token }, done) { + _logout ({ companion, token }, done) { companion.getProviderCredentials().then(({ key, secret }) => { const encodedAuth = Buffer.from(`${key}:${secret}`, 'binary').toString('base64') return this.client @@ -276,15 +273,16 @@ class Zoom extends Provider { }).catch((err) => done(err)) } - deauthorizationCallback ({ companion, body, headers }, done) { + _deauthorizationCallback ({ companion, body, headers }, done) { if (!body || body.event !== DEAUTH_EVENT_NAME) { - return done(null, {}, 400) + done(null, { data: {}, status: 400 }) + return } companion.getProviderCredentials().then(({ verificationToken, key, secret }) => { const tokenSupplied = headers.authorization if (!tokenSupplied || verificationToken !== tokenSupplied) { - return done(null, {}, 400) + return done(null, { data: {}, status: 400 }) } const encodedAuth = Buffer.from(`${key}:${secret}`, 'binary').toString('base64') @@ -341,4 +339,11 @@ class Zoom extends Provider { } } +Zoom.version = 2 + +Zoom.prototype.list = promisify(Zoom.prototype._list) +Zoom.prototype.size = promisify(Zoom.prototype._size) +Zoom.prototype.logout = promisify(Zoom.prototype._logout) +Zoom.prototype.deauthorizationCallback = promisify(Zoom.prototype._deauthorizationCallback) + module.exports = Zoom diff --git a/src/standalone/helper.js b/src/standalone/helper.js index 0494344c36..484e0a8874 100644 --- a/src/standalone/helper.js +++ b/src/standalone/helper.js @@ -102,6 +102,8 @@ const getConfigFromEnv = () => { // cookieDomain is kind of a hack to support distributed systems. This should be improved but we never got so far. cookieDomain: process.env.COMPANION_COOKIE_DOMAIN, multipleInstances: true, + streamingUpload: process.env.COMPANION_STREAMING_UPLOAD === 'true', + maxFileSize: process.env.COMPANION_MAX_FILE_SIZE ? parseInt(process.env.COMPANION_MAX_FILE_SIZE, 10) : undefined, } } diff --git a/test/__mocks__/purest.js b/test/__mocks__/purest.js index ec3b770273..89102ea5a5 100644 --- a/test/__mocks__/purest.js +++ b/test/__mocks__/purest.js @@ -1,5 +1,6 @@ const fs = require('fs') const qs = require('querystring') + const fixtures = require('../fixtures').providers function has (object, property) { @@ -33,6 +34,14 @@ class MockPurest { return this } + _getStatusCode () { + const { validators } = fixtures[this.opts.providerName] + if (validators && validators[this._requestUrl]) { + return validators[this._requestUrl](this._requestOptions) ? 200 : 400 + } + return 200 + } + request (done) { if (typeof done === 'function') { const { responses } = fixtures[this.opts.providerName] @@ -40,14 +49,10 @@ class MockPurest { const endpointResponses = responses[url] || responses[this._requestUrl] if (endpointResponses == null || !has(endpointResponses, this._method)) { done(new Error(`No fixture for ${this._method} ${url}`)) - return + return this } - let statusCode = 200 - const { validators } = fixtures[this.opts.providerName] - if (validators && validators[this._requestUrl]) { - statusCode = validators[this._requestUrl](this._requestOptions) ? 200 : 400 - } + const statusCode = this._getStatusCode() const body = statusCode === 200 ? endpointResponses[this._method] : {} done(null, { body, statusCode }, body) @@ -58,7 +63,9 @@ class MockPurest { on (evt, cb) { if (evt === 'response') { - cb(fs.createReadStream('./README.md')) + const stream = fs.createReadStream('./README.md') + stream.statusCode = this._getStatusCode() + cb(stream) } return this } diff --git a/test/__tests__/providers.js b/test/__tests__/providers.js index 9e4ab149bc..dcfc4f14dc 100644 --- a/test/__tests__/providers.js +++ b/test/__tests__/providers.js @@ -10,9 +10,12 @@ jest.mock('../../src/server/helpers/request', () => { jest.mock('../../src/server/helpers/oauth-state', () => require('../mockoauthstate')()) const request = require('supertest') +const nock = require('nock') + const fixtures = require('../fixtures') const tokenService = require('../../src/server/helpers/jwt') const { getServer } = require('../mockserver') +const defaults = require('../fixtures/constants') const authServer = getServer() const OAUTH_STATE = 'some-cool-nice-encrytpion' @@ -37,6 +40,16 @@ const thisOrThat = (value1, value2) => { return value2 } +beforeAll(() => { + const url = new URL(defaults.THUMBNAIL_URL) + nock(url.origin).get(url.pathname).reply(200, () => '').persist() +}) + +afterAll(() => { + nock.cleanAll() + nock.restore() +}) + describe('set i-am header', () => { test.each(providerNames)('set i-am header in response (%s)', (providerName) => { const providerFixtures = fixtures.providers[providerName].expects @@ -83,7 +96,7 @@ describe('list provider files', () => { }) }) -describe('download provdier file', () => { +describe('download provider file', () => { test.each(providerNames)('specified file gets downloaded from %s', (providerName) => { const providerFixtures = fixtures.providers[providerName].expects return request(authServer) diff --git a/test/__tests__/uploader.js b/test/__tests__/uploader.js index 8f6c761246..6ba32c07ef 100644 --- a/test/__tests__/uploader.js +++ b/test/__tests__/uploader.js @@ -2,7 +2,9 @@ jest.mock('tus-js-client') +const intoStream = require('into-stream') const fs = require('fs') + const Uploader = require('../../src/server/Uploader') const socketClient = require('../mocksocket') const standalone = require('../../src/standalone') @@ -37,8 +39,9 @@ describe('uploader with tus protocol', () => { expect(new Uploader(opts).hasError()).toBe(false) }) - test('upload functions with tus protocol', () => { + test('upload functions with tus protocol', async () => { const fileContent = Buffer.from('Some file content') + const stream = intoStream(fileContent) const opts = { companionOptions, endpoint: 'http://url.myendpoint.com/files', @@ -52,14 +55,62 @@ describe('uploader with tus protocol', () => { expect(uploader.hasError()).toBe(false) expect(uploadToken).toBeTruthy() - return new Promise((resolve) => { + return new Promise((resolve, reject) => { // validate that the test is resolved on socket connection - uploader.onSocketReady(() => { - const fileInfo = fs.statSync(uploader.path) - expect(fileInfo.isFile()).toBe(true) - expect(fileInfo.size).toBe(0) - uploader.handleChunk(null, fileContent) - uploader.handleChunk(null, null) + uploader.awaitReady().then(() => { + uploader.uploadStream(stream).then(() => resolve()) + }) + + let progressReceived = 0 + // emulate socket connection + socketClient.connect(uploadToken) + socketClient.onProgress(uploadToken, (message) => { + progressReceived = message.payload.bytesUploaded + try { + expect(message.payload.bytesTotal).toBe(fileContent.length) + } catch (err) { + reject(err) + } + }) + socketClient.onUploadSuccess(uploadToken, (message) => { + try { + expect(progressReceived).toBe(fileContent.length) + // see __mocks__/tus-js-client.js + expect(message.payload.url).toBe('https://tus.endpoint/files/foo-bar') + } catch (err) { + reject(err) + } + }) + }) + }) + + test('upload functions with tus protocol without size', async () => { + const fileContent = Buffer.alloc(1e6) + const stream = intoStream(fileContent) + const opts = { + companionOptions, + endpoint: 'http://url.myendpoint.com/files', + protocol: 'tus', + size: null, + pathPrefix: companionOptions.filePath, + } + + const uploader = new Uploader(opts) + const uploadToken = uploader.token + expect(uploader.hasError()).toBe(false) + expect(uploadToken).toBeTruthy() + + return new Promise((resolve, reject) => { + // validate that the test is resolved on socket connection + uploader.awaitReady().then(() => { + uploader.uploadStream(stream).then(() => { + try { + expect(fs.existsSync(uploader.path)).toBe(false) + resolve() + } catch (err) { + reject(err) + } + }) }) let progressReceived = 0 @@ -67,22 +118,80 @@ describe('uploader with tus protocol', () => { socketClient.connect(uploadToken) socketClient.onProgress(uploadToken, (message) => { // validate that the file has been downloaded and saved into the file path - const fileInfo = fs.statSync(uploader.path) - expect(fileInfo.isFile()).toBe(true) - expect(fileInfo.size).toBe(fileContent.length) + try { + progressReceived = message.payload.bytesUploaded - progressReceived = message.payload.bytesUploaded - expect(message.payload.bytesTotal).toBe(fileContent.length) + if (progressReceived === fileContent.length) { + const fileInfo = fs.statSync(uploader.tmpPath) + expect(fileInfo.isFile()).toBe(true) + expect(fileInfo.size).toBe(fileContent.length) + expect(message.payload.bytesTotal).toBe(fileContent.length) + } + } catch (err) { + reject(err) + } }) socketClient.onUploadSuccess(uploadToken, (message) => { - expect(progressReceived).toBe(fileContent.length) - // see __mocks__/tus-js-client.js - expect(message.payload.url).toBe('https://tus.endpoint/files/foo-bar') - setTimeout(() => { - // check that file has been cleaned up - expect(fs.existsSync(uploader.path)).toBe(false) + try { + expect(progressReceived).toBe(fileContent.length) + // see __mocks__/tus-js-client.js + expect(message.payload.url).toBe('https://tus.endpoint/files/foo-bar') + } catch (err) { + reject(err) + } + }) + }) + }) + + test('uploader respects maxFileSize', async () => { + const opts = { + endpoint: 'http://url.myendpoint.com/files', + companionOptions: { ...companionOptions, maxFileSize: 100 }, + size: 101, + } + + const uploader = new Uploader(opts) + expect(uploader.hasError()).toBe(true) + }) + + test('uploader respects maxFileSize correctly', async () => { + const opts = { + endpoint: 'http://url.myendpoint.com/files', + companionOptions: { ...companionOptions, maxFileSize: 100 }, + size: 99, + } + + const uploader = new Uploader(opts) + expect(uploader.hasError()).toBe(false) + }) + + test('uploader respects maxFileSize with unknown size', async () => { + const fileContent = Buffer.alloc(10000) + const stream = intoStream(fileContent) + const opts = { + companionOptions: { ...companionOptions, maxFileSize: 1000 }, + endpoint: 'http://url.myendpoint.com/files', + protocol: 'tus', + size: null, + pathPrefix: companionOptions.filePath, + } + + const uploader = new Uploader(opts) + const uploadToken = uploader.token + expect(uploader.hasError()).toBe(false) + + // validate that the test is resolved on socket connection + uploader.awaitReady().then(uploader.uploadStream(stream)) + socketClient.connect(uploadToken) + + return new Promise((resolve, reject) => { + socketClient.onUploadError(uploadToken, (message) => { + try { + expect(message).toMatchObject({ payload: { error: { message: 'maxFileSize exceeded' } } }) resolve() - }, 100) + } catch (err) { + reject(err) + } }) }) }) diff --git a/test/__tests__/url.js b/test/__tests__/url.js index a3c1d0d8a3..201518cdad 100644 --- a/test/__tests__/url.js +++ b/test/__tests__/url.js @@ -1,8 +1,12 @@ /* global jest:false, test:false, expect:false, describe:false */ +const nock = require('nock') +const request = require('supertest') + jest.mock('tus-js-client') jest.mock('../../src/server/helpers/request', () => { return { + ...jest.requireActual('../../src/server/helpers/request'), getURLMeta: () => { return Promise.resolve({ size: 7580, type: 'image/jpg' }) }, @@ -11,7 +15,15 @@ jest.mock('../../src/server/helpers/request', () => { const { getServer } = require('../mockserver') const mockServer = getServer() -const request = require('supertest') + +beforeAll(() => { + nock('http://url.myendpoint.com').get('/files').reply(200, () => '') +}) + +afterAll(() => { + nock.cleanAll() + nock.restore() +}) const invalids = [ // no url at all or unsupported protocol diff --git a/test/fixtures/facebook.js b/test/fixtures/facebook.js index 20b09bcd71..ff2ebf159f 100644 --- a/test/fixtures/facebook.js +++ b/test/fixtures/facebook.js @@ -43,9 +43,6 @@ module.exports.responses = { id: defaults.ITEM_ID, }, }, - [defaults.THUMBNAIL_URL]: { - get: {}, - }, } module.exports.expects = { diff --git a/test/mocksocket.js b/test/mocksocket.js index c9ffe52f4c..dd2cf53aab 100644 --- a/test/mocksocket.js +++ b/test/mocksocket.js @@ -19,3 +19,11 @@ module.exports.onUploadSuccess = (uploadToken, cb) => { } }) } + +module.exports.onUploadError = (uploadToken, cb) => { + emitter().on(uploadToken, (message) => { + if (message.action === 'error') { + cb(message) + } + }) +}