Skip to content

Commit

Permalink
stream: support flatMap
Browse files Browse the repository at this point in the history
Support the `flatMap` method from the iterator helper TC39 proposal on
readable streams.

Co-Authored-By: Robert Nagy <ronagy@icloud.com>
PR-URL: #41612
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
2 people authored and danielleadams committed Apr 21, 2022
1 parent 84752a4 commit 205c018
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 0 deletions.
49 changes: 49 additions & 0 deletions doc/api/stream.md
Expand Up @@ -1972,6 +1972,55 @@ console.log(allBigFiles);
console.log('done'); // Stream has finished
```

### `readable.flatMap(fn[, options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
* `fn` {Function|AsyncGeneratorFunction|AsyncFunction} a function to map over
every item in the stream.
* `data` {any} a chunk of data from the stream.
* `options` {Object}
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
abort the `fn` call early.
* `options` {Object}
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
on the stream at once. **Default:** `1`.
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Readable} a stream flat-mapped with the function `fn`.

This method returns a new stream by applying the given callback to each
chunk of the stream and then flattening the result.

It is possible to return a stream or another iterable or async iterable from
`fn` and the result streams will be merged (flattened) into the returned
stream.

```mjs
import { Readable } from 'stream';
import { createReadStream } from 'fs';

// With a synchronous mapper.
for await (const item of Readable.from([1, 2, 3, 4]).flatMap((x) => [x, x])) {
console.log(item); // 1, 1, 2, 2, 3, 3, 4, 4
}
// With an asynchronous mapper, combine the contents of 4 files
const concatResult = Readable.from([
'./1.mjs',
'./2.mjs',
'./3.mjs',
'./4.mjs',
]).flatMap((fileName) => createReadStream(fileName));
for await (const result of concatResult) {
// This will contain the contents (all chunks) of all 4 files
console.log(result);
}
```

### Duplex and transform streams

#### Class: `stream.Duplex`
Expand Down
8 changes: 8 additions & 0 deletions lib/internal/streams/operators.js
Expand Up @@ -229,8 +229,16 @@ async function toArray(options) {
}
return result;
}

async function* flatMap(fn, options) {
for await (const val of this.map(fn, options)) {
yield* val;
}
}

module.exports.streamReturningOperators = {
filter,
flatMap,
map,
};

Expand Down
131 changes: 131 additions & 0 deletions test/parallel/test-stream-flatMap.js
@@ -0,0 +1,131 @@
'use strict';

const common = require('../common');
const fixtures = require('../common/fixtures');
const {
Readable,
} = require('stream');
const assert = require('assert');
const { setTimeout } = require('timers/promises');
const { createReadStream } = require('fs');

function oneTo5() {
return Readable.from([1, 2, 3, 4, 5]);
}

{
// flatMap works on synchronous streams with a synchronous mapper
(async () => {
assert.deepStrictEqual(
await oneTo5().flatMap((x) => [x + x]).toArray(),
[2, 4, 6, 8, 10]
);
assert.deepStrictEqual(
await oneTo5().flatMap(() => []).toArray(),
[]
);
assert.deepStrictEqual(
await oneTo5().flatMap((x) => [x, x]).toArray(),
[1, 1, 2, 2, 3, 3, 4, 4, 5, 5]
);
})().then(common.mustCall());
}


{
// flatMap works on sync/async streams with an asynchronous mapper
(async () => {
assert.deepStrictEqual(
await oneTo5().flatMap(async (x) => [x, x]).toArray(),
[1, 1, 2, 2, 3, 3, 4, 4, 5, 5]
);
const asyncOneTo5 = oneTo5().map(async (x) => x);
assert.deepStrictEqual(
await asyncOneTo5.flatMap(async (x) => [x, x]).toArray(),
[1, 1, 2, 2, 3, 3, 4, 4, 5, 5]
);
})().then(common.mustCall());
}
{
// flatMap works on a stream where mapping returns a stream
(async () => {
const result = await oneTo5().flatMap(async (x) => {
return Readable.from([x, x]);
}).toArray();
assert.deepStrictEqual(result, [1, 1, 2, 2, 3, 3, 4, 4, 5, 5]);
})().then(common.mustCall());
// flatMap works on an objectMode stream where mappign returns a stream
(async () => {
const result = await oneTo5().flatMap(() => {
return createReadStream(fixtures.path('x.txt'));
}).toArray();
// The resultant stream is in object mode so toArray shouldn't flatten
assert.strictEqual(result.length, 5);
assert.deepStrictEqual(
Buffer.concat(result).toString(),
'xyz\n'.repeat(5)
);

})().then(common.mustCall());

}

{
// Concurrency + AbortSignal
const ac = new AbortController();
const stream = oneTo5().flatMap(common.mustNotCall(async (_, { signal }) => {
await setTimeout(100, { signal });
}), { signal: ac.signal, concurrency: 2 });
// pump
assert.rejects(async () => {
for await (const item of stream) {
// nope
console.log(item);
}
}, {
name: 'AbortError',
}).then(common.mustCall());

queueMicrotask(() => {
ac.abort();
});
}

{
// Already aborted AbortSignal
const stream = oneTo5().flatMap(common.mustNotCall(async (_, { signal }) => {
await setTimeout(100, { signal });
}), { signal: AbortSignal.abort() });
// pump
assert.rejects(async () => {
for await (const item of stream) {
// nope
console.log(item);
}
}, {
name: 'AbortError',
}).then(common.mustCall());
}

{
// Error cases
assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const unused of Readable.from([1]).flatMap(1));
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of Readable.from([1]).flatMap((x) => x, {
concurrency: 'Foo'
}));
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of Readable.from([1]).flatMap((x) => x, 1));
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
}
{
// Test result is a Readable
const stream = oneTo5().flatMap((x) => x);
assert.strictEqual(stream.readable, true);
}

0 comments on commit 205c018

Please sign in to comment.