Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: add flatMap to readable #41612

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
49 changes: 49 additions & 0 deletions doc/api/stream.md
Expand Up @@ -2027,6 +2027,55 @@ console.log(allBigFiles);
console.log('done'); // Stream has finished
```

### `readable.flatMap(fn[, options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* `fn` {Function|AsyncGeneratorFunction|AsyncFunction} a function to map over
every item in the stream.
* `data` {any} a chunk of data from the stream.
* `options` {Object}
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
abort the `fn` call early.
* `options` {Object}
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
on the stream at once. **Default:** `1`.
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Readable} a stream flat-mapped with the function `fn`.

This method returns a new stream by applying the given callback to each
chunk of the stream and then flattening the result.

It is possible to return a stream or another iterable or async iterable from
`fn` and the result streams will be merged (flattened) into the returned
stream.

```mjs
import { Readable } from 'stream';
import { createReadStream } from 'fs';

// With a synchronous mapper.
for await (const item of Readable.from([1, 2, 3, 4]).flatMap((x) => [x, x])) {
console.log(item); // 1, 1, 2, 2, 3, 3, 4, 4
}
// With an asynchronous mapper, combine the contents of 4 files
const concatResult = Readable.from([
'./1.mjs',
'./2.mjs',
'./3.mjs',
'./4.mjs',
]).flatMap((fileName) => createReadStream(fileName));
for await (const result of concatResult) {
// This will contain the contents (all chunks) of all 4 files
console.log(result);
}
```

### Duplex and transform streams

#### Class: `stream.Duplex`
Expand Down
8 changes: 8 additions & 0 deletions lib/internal/streams/operators.js
Expand Up @@ -229,8 +229,16 @@ async function toArray(options) {
}
return result;
}

async function* flatMap(fn, options) {
for await (const val of this.map(fn, options)) {
yield* val;
}
}

module.exports.streamReturningOperators = {
filter,
flatMap,
map,
};

Expand Down
131 changes: 131 additions & 0 deletions test/parallel/test-stream-flatMap.js
@@ -0,0 +1,131 @@
'use strict';

const common = require('../common');
const fixtures = require('../common/fixtures');
const {
Readable,
} = require('stream');
const assert = require('assert');
const { setTimeout } = require('timers/promises');
const { createReadStream } = require('fs');

function oneTo5() {
return Readable.from([1, 2, 3, 4, 5]);
}

{
// flatMap works on synchronous streams with a synchronous mapper
(async () => {
assert.deepStrictEqual(
await oneTo5().flatMap((x) => [x + x]).toArray(),
[2, 4, 6, 8, 10]
);
assert.deepStrictEqual(
await oneTo5().flatMap(() => []).toArray(),
[]
);
assert.deepStrictEqual(
await oneTo5().flatMap((x) => [x, x]).toArray(),
[1, 1, 2, 2, 3, 3, 4, 4, 5, 5]
);
})().then(common.mustCall());
}


{
// flatMap works on sync/async streams with an asynchronous mapper
(async () => {
assert.deepStrictEqual(
await oneTo5().flatMap(async (x) => [x, x]).toArray(),
[1, 1, 2, 2, 3, 3, 4, 4, 5, 5]
);
const asyncOneTo5 = oneTo5().map(async (x) => x);
assert.deepStrictEqual(
await asyncOneTo5.flatMap(async (x) => [x, x]).toArray(),
[1, 1, 2, 2, 3, 3, 4, 4, 5, 5]
);
})().then(common.mustCall());
}
{
// flatMap works on a stream where mapping returns a stream
(async () => {
const result = await oneTo5().flatMap(async (x) => {
return Readable.from([x, x]);
}).toArray();
assert.deepStrictEqual(result, [1, 1, 2, 2, 3, 3, 4, 4, 5, 5]);
})().then(common.mustCall());
// flatMap works on an objectMode stream where mappign returns a stream
(async () => {
const result = await oneTo5().flatMap(() => {
return createReadStream(fixtures.path('x.txt'));
}).toArray();
// The resultant stream is in object mode so toArray shouldn't flatten
assert.strictEqual(result.length, 5);
assert.deepStrictEqual(
Buffer.concat(result).toString(),
'xyz\n'.repeat(5)
);

})().then(common.mustCall());

}

{
// Concurrency + AbortSignal
const ac = new AbortController();
const stream = oneTo5().flatMap(common.mustNotCall(async (_, { signal }) => {
await setTimeout(100, { signal });
}), { signal: ac.signal, concurrency: 2 });
// pump
assert.rejects(async () => {
for await (const item of stream) {
// nope
console.log(item);
}
}, {
name: 'AbortError',
}).then(common.mustCall());

queueMicrotask(() => {
ac.abort();
});
}

{
// Already aborted AbortSignal
const stream = oneTo5().flatMap(common.mustNotCall(async (_, { signal }) => {
await setTimeout(100, { signal });
}), { signal: AbortSignal.abort() });
// pump
assert.rejects(async () => {
for await (const item of stream) {
// nope
console.log(item);
}
}, {
name: 'AbortError',
}).then(common.mustCall());
}

{
// Error cases
assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const unused of Readable.from([1]).flatMap(1));
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of Readable.from([1]).flatMap((x) => x, {
concurrency: 'Foo'
}));
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of Readable.from([1]).flatMap((x) => x, 1));
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
}
{
// Test result is a Readable
const stream = oneTo5().flatMap((x) => x);
assert.strictEqual(stream.readable, true);
}