From 527e2147afe43c7b19fd140ebd048c896705da7f Mon Sep 17 00:00:00 2001 From: rickyes Date: Sun, 28 Jun 2020 16:29:01 +0800 Subject: [PATCH] stream: add promises version to utility functions PR-URL: https://github.com/nodejs/node/pull/33991 Fixes: https://github.com/nodejs/node/issues/33582 Reviewed-By: James M Snell Reviewed-By: Denys Otrishko Reviewed-By: Matteo Collina Reviewed-By: Trivikram Kamat Reviewed-By: Robert Nagy --- doc/api/stream.md | 22 ++++-- lib/stream.js | 36 +++++++++ lib/stream/promises.js | 39 ++++++++++ node.gyp | 1 + test/parallel/test-stream-promises.js | 103 ++++++++++++++++++++++++++ 5 files changed, 193 insertions(+), 8 deletions(-) create mode 100644 lib/stream/promises.js create mode 100644 test/parallel/test-stream-promises.js diff --git a/doc/api/stream.md b/doc/api/stream.md index 060cb628443b8d..361958a11a6c33 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -48,6 +48,13 @@ Additionally, this module includes the utility functions [`stream.pipeline()`][], [`stream.finished()`][] and [`stream.Readable.from()`][]. +### Streams Promises API + +The `stream/promises` API provides an alternative set of asynchronous utility +functions for streams that return `Promise` objects rather than using +callbacks. The API is accessible via `require('stream/promises')` +or `require('stream').promises`. + ### Object mode All streams created by Node.js APIs operate exclusively on strings and `Buffer` @@ -1597,10 +1604,10 @@ Especially useful in error handling scenarios where a stream is destroyed prematurely (like an aborted HTTP request), and will not emit `'end'` or `'finish'`. -The `finished` API is promisify-able as well; +The `finished` API provides promise version: ```js -const finished = util.promisify(stream.finished); +const { finished } = require('stream/promises'); const rs = fs.createReadStream('archive.tar'); @@ -1684,10 +1691,10 @@ pipeline( ); ``` -The `pipeline` API is promisify-able as well: +The `pipeline` API provides promise version: ```js -const pipeline = util.promisify(stream.pipeline); +const { pipeline } = require('stream/promises'); async function run() { await pipeline( @@ -1704,7 +1711,7 @@ run().catch(console.error); The `pipeline` API also supports async generators: ```js -const pipeline = util.promisify(stream.pipeline); +const { pipeline } = require('stream/promises'); const fs = require('fs'); async function run() { @@ -2927,9 +2934,9 @@ handling of backpressure and errors. [`stream.pipeline()`][] abstracts away the handling of backpressure and backpressure-related errors: ```js -const { pipeline } = require('stream'); -const util = require('util'); const fs = require('fs'); +const { pipeline } = require('stream'); +const { pipeline: pipelinePromise } = require('stream/promises'); const writable = fs.createWriteStream('./file'); @@ -2943,7 +2950,6 @@ pipeline(iterator, writable, (err, value) => { }); // Promise Pattern -const pipelinePromise = util.promisify(pipeline); pipelinePromise(iterator, writable) .then((value) => { console.log(value, 'value returned'); diff --git a/lib/stream.js b/lib/stream.js index 725038ba9c0d1c..ed6cc19753c806 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -21,10 +21,21 @@ 'use strict'; +const { + ObjectDefineProperty, +} = primordials; + +const { + promisify: { custom: customPromisify }, +} = require('internal/util'); + const pipeline = require('internal/streams/pipeline'); const eos = require('internal/streams/end-of-stream'); const internalBuffer = require('internal/buffer'); +// Lazy loaded +let promises = null; + // Note: export Stream before Readable/Writable/Duplex/... // to avoid a cross-reference(require) issues const Stream = module.exports = require('internal/streams/legacy'); @@ -38,6 +49,31 @@ Stream.PassThrough = require('_stream_passthrough'); Stream.pipeline = pipeline; Stream.finished = eos; +ObjectDefineProperty(Stream, 'promises', { + configurable: true, + enumerable: true, + get() { + if (promises === null) promises = require('stream/promises'); + return promises; + } +}); + +ObjectDefineProperty(pipeline, customPromisify, { + enumerable: true, + get() { + if (promises === null) promises = require('stream/promises'); + return promises.pipeline; + } +}); + +ObjectDefineProperty(eos, customPromisify, { + enumerable: true, + get() { + if (promises === null) promises = require('stream/promises'); + return promises.finished; + } +}); + // Backwards-compat with node 0.4.x Stream.Stream = Stream; diff --git a/lib/stream/promises.js b/lib/stream/promises.js new file mode 100644 index 00000000000000..986db2e1f8db8a --- /dev/null +++ b/lib/stream/promises.js @@ -0,0 +1,39 @@ +'use strict'; + +const { + Promise, +} = primordials; + +let pl; +let eos; + +function pipeline(...streams) { + if (!pl) pl = require('internal/streams/pipeline'); + return new Promise((resolve, reject) => { + pl(...streams, (err, value) => { + if (err) { + reject(err); + } else { + resolve(value); + } + }); + }); +} + +function finished(stream, opts) { + if (!eos) eos = require('internal/streams/end-of-stream'); + return new Promise((resolve, reject) => { + eos(stream, opts, (err) => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); +} + +module.exports = { + finished, + pipeline, +}; diff --git a/node.gyp b/node.gyp index 23f9e2ec7ffe5c..da6745e1c5f00a 100644 --- a/node.gyp +++ b/node.gyp @@ -78,6 +78,7 @@ 'lib/readline.js', 'lib/repl.js', 'lib/stream.js', + 'lib/stream/promises.js', 'lib/_stream_readable.js', 'lib/_stream_writable.js', 'lib/_stream_duplex.js', diff --git a/test/parallel/test-stream-promises.js b/test/parallel/test-stream-promises.js new file mode 100644 index 00000000000000..86c44b279fa4a1 --- /dev/null +++ b/test/parallel/test-stream-promises.js @@ -0,0 +1,103 @@ +'use strict'; + +const common = require('../common'); +const stream = require('stream'); +const { + Readable, + Writable, + promises, +} = stream; +const { + finished, + pipeline, +} = require('stream/promises'); +const fs = require('fs'); +const assert = require('assert'); +const { promisify } = require('util'); + +assert.strictEqual(promises.pipeline, pipeline); +assert.strictEqual(promises.finished, finished); +assert.strictEqual(pipeline, promisify(stream.pipeline)); +assert.strictEqual(finished, promisify(stream.finished)); + +// pipeline success +{ + let finished = false; + const processed = []; + const expected = [ + Buffer.from('a'), + Buffer.from('b'), + Buffer.from('c') + ]; + + const read = new Readable({ + read() { } + }); + + const write = new Writable({ + write(data, enc, cb) { + processed.push(data); + cb(); + } + }); + + write.on('finish', () => { + finished = true; + }); + + for (let i = 0; i < expected.length; i++) { + read.push(expected[i]); + } + read.push(null); + + pipeline(read, write).then(common.mustCall((value) => { + assert.ok(finished); + assert.deepStrictEqual(processed, expected); + })); +} + +// pipeline error +{ + const read = new Readable({ + read() { } + }); + + const write = new Writable({ + write(data, enc, cb) { + cb(); + } + }); + + read.push('data'); + setImmediate(() => read.destroy()); + + pipeline(read, write).catch(common.mustCall((err) => { + assert.ok(err, 'should have an error'); + })); +} + +// finished success +{ + async function run() { + const rs = fs.createReadStream(__filename); + + let ended = false; + rs.resume(); + rs.on('end', () => { + ended = true; + }); + await finished(rs); + assert(ended); + } + + run().then(common.mustCall()); +} + +// finished error +{ + const rs = fs.createReadStream('file-does-not-exist'); + + assert.rejects(finished(rs), { + code: 'ENOENT' + }).then(common.mustCall()); +}