Skip to content

Commit

Permalink
stream: add toArray
Browse files Browse the repository at this point in the history
Add the toArray method from the TC39 iterator helper proposal to
Readable streams. This also enables a common-use case of converting a
stream to an array.

Co-Authored-By: Robert Nagy <ronagy@icloud.com>
  • Loading branch information
benjamingr and ronag committed Jan 17, 2022
1 parent 3f0bcfb commit c9d25ab
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 0 deletions.
40 changes: 40 additions & 0 deletions doc/api/stream.md
Expand Up @@ -1889,6 +1889,46 @@ await dnsResults.forEach((result) => {
console.log('done'); // Stream has finished
```

### `readable.toArray([options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
* `options` {Object}
* `signal` {AbortSignal} allows cancelling the toArray operation if the
signal is aborted.
* Returns: {Promise} a promise containing an array (if the stream is in
object mode) or Buffer with the contents of the stream.

This method allows easily obtaining the contents of a stream. If the
stream is in [object mode][object-mode] an array of its contents is returned.
If the stream is not in object mode a Buffer containing its data is returned.

As this method reads the entire stream into memory, it negates the benefits of
streams. It's intended for interoperability and convenience, not as the primary
way to consume streams.

```mjs
import { Readable } from 'stream';
import { Resolver } from 'dns/promises';

await Readable.from([1, 2, 3, 4]).toArray(); // [1, 2, 3, 4]

// Make dns queries concurrently using .map and collect
// the results into an aray using toArray
const dnsResults = await Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).map(async (domain) => {
const { address } = await resolver.resolve4(domain, { ttl: true });
return address;
}, { concurrency: 2 }).toArray();
```

### Duplex and transform streams

#### Class: `stream.Duplex`
Expand Down
17 changes: 17 additions & 0 deletions lib/internal/streams/operators.js
@@ -1,6 +1,8 @@
'use strict';

const { AbortController } = require('internal/abort_controller');
const { Buffer } = require('buffer');

const {
codes: {
ERR_INVALID_ARG_TYPE,
Expand All @@ -10,6 +12,7 @@ const {
const { validateInteger } = require('internal/validators');

const {
ArrayPrototypePush,
MathFloor,
Promise,
PromiseReject,
Expand Down Expand Up @@ -174,11 +177,25 @@ async function * filter(fn, options) {
yield* this.map(filterFn, options);
}

async function toArray(options) {
const result = [];
for await (const val of this) {
if (options?.signal?.aborted) {
throw new AbortError({ cause: options.signal.reason });
}
ArrayPrototypePush(result, val);
}
if (!this.readableObjectMode) {
return Buffer.concat(result);
}
return result;
}
module.exports.streamReturningOperators = {
filter,
map,
};

module.exports.promiseReturningOperators = {
forEach,
toArray,
};
83 changes: 83 additions & 0 deletions test/parallel/test-stream-toArray.js
@@ -0,0 +1,83 @@
'use strict';

const common = require('../common');
const {
Readable,
} = require('stream');
const assert = require('assert');
const { setTimeout } = require('timers/promises');

{
// Works on a synchronous stream
(async () => {
const tests = [
[],
[1],
[1, 2, 3],
Array(100).fill().map((_, i) => i),
];
for (const test of tests) {
const stream = Readable.from(test);
const result = await stream.toArray();
assert.deepStrictEqual(result, test);
}
})().then(common.mustCall());
}

{
// Works on a non-object-mode stream and flattens it
(async () => {
const stream = Readable.from(
[Buffer.from([1, 2, 3]), Buffer.from([4, 5, 6])]
, { objectMode: false });
const result = await stream.toArray();
assert.strictEqual(Buffer.isBuffer(result), true);
assert.deepStrictEqual(Array.from(result), [1, 2, 3, 4, 5, 6]);
})().then(common.mustCall());
}

{
// Works on an asynchronous stream
(async () => {
const tests = [
[],
[1],
[1, 2, 3],
Array(100).fill().map((_, i) => i),
];
for (const test of tests) {
const stream = Readable.from(test).map((x) => Promise.resolve(x));
const result = await stream.toArray();
assert.deepStrictEqual(result, test);
}
})().then(common.mustCall());
}

{
// Support for AbortSignal
const ac = new AbortController();
let stream;
assert.rejects(async () => {
stream = Readable.from([1, 2, 3]).map(async (x) => {
if (x === 3) {
await setTimeout(100, {}); // Explicitly do not pass signal here
}
Promise.resolve(x);
});
await stream.toArray({ signal: ac.signal });
}, {
name: 'AbortError',
}).then(common.mustCall(() => {
// Only stops toArray, does not destory the stream
assert(stream.destroyed, false);
}));

setImmediate(() => {
ac.abort();
});
}
{
// Test result is a Promise
const result = Readable.from([1, 2, 3, 4, 5]).toArray();
assert.strictEqual(result instanceof Promise, true);
}

0 comments on commit c9d25ab

Please sign in to comment.