Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: add compose operator #44937

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
39 changes: 39 additions & 0 deletions doc/api/stream.md
Expand Up @@ -1681,6 +1681,41 @@ option. In the code example above, data will be in a single chunk if the file
has less then 64 KiB of data because no `highWaterMark` option is provided to
[`fs.createReadStream()`][].

##### `readable.compose(stream[, options])`

<!-- YAML
added: REPLACEME
-->

rluvaton marked this conversation as resolved.
Show resolved Hide resolved
> Stability: 1 - Experimental

* `stream` {Stream|Iterable|AsyncIterable|Function}
* `options` {Object}
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Duplex} a stream composed with the stream `stream`.

```mjs
import { Readable } from 'node:stream';

async function* splitToWords(source) {
for await (const chunk of source) {
const words = String(chunk).split(' ');

for (const word of words) {
yield word;
}
}
}

const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords);
const words = await wordsStream.toArray();

console.log(words); // prints ['this', 'is', 'compose', 'as', 'operator']
```

See [`stream.compose`][] for more information.

##### `readable.iterator([options])`

<!-- YAML
Expand Down Expand Up @@ -2715,6 +2750,8 @@ await finished(compose(s1, s2, s3));
console.log(res); // prints 'HELLOWORLD'
```

See [`readable.compose(stream)`][] for `stream.compose` as operator.

### `stream.Readable.from(iterable[, options])`

<!-- YAML
Expand Down Expand Up @@ -4482,11 +4519,13 @@ contain multi-byte characters.
[`process.stdin`]: process.md#processstdin
[`process.stdout`]: process.md#processstdout
[`readable._read()`]: #readable_readsize
[`readable.compose(stream)`]: #readablecomposestream-options
[`readable.map`]: #readablemapfn-options
[`readable.push('')`]: #readablepush
[`readable.setEncoding()`]: #readablesetencodingencoding
[`stream.Readable.from()`]: #streamreadablefromiterable-options
[`stream.addAbortSignal()`]: #streamaddabortsignalsignal-stream
[`stream.compose`]: #streamcomposestreams
[`stream.cork()`]: #writablecork
[`stream.finished()`]: #streamfinishedstream-options-callback
[`stream.pipe()`]: #readablepipedestination-options
Expand Down
32 changes: 32 additions & 0 deletions lib/internal/streams/operators.js
Expand Up @@ -4,6 +4,7 @@ const { AbortController } = require('internal/abort_controller');

const {
codes: {
ERR_INVALID_ARG_VALUE,
ERR_INVALID_ARG_TYPE,
ERR_MISSING_ARGS,
ERR_OUT_OF_RANGE,
Expand All @@ -17,6 +18,11 @@ const {
} = require('internal/validators');
const { kWeakHandler } = require('internal/event_target');
const { finished } = require('internal/streams/end-of-stream');
const staticCompose = require('internal/streams/compose');
const {
addAbortSignalNoValidate,
} = require('internal/streams/add-abort-signal');
const { isWritable, isNodeStream } = require('internal/streams/utils');

const {
ArrayPrototypePush,
Expand All @@ -32,6 +38,31 @@ const {
const kEmpty = Symbol('kEmpty');
const kEof = Symbol('kEof');

function compose(stream, options) {
if (options != null) {
validateObject(options, 'options');
}
if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
}
rluvaton marked this conversation as resolved.
Show resolved Hide resolved

if (isNodeStream(stream) && !isWritable(stream)) {
throw new ERR_INVALID_ARG_VALUE('stream', stream, 'must be writable');
}

const composedStream = staticCompose(this, stream);
rluvaton marked this conversation as resolved.
Show resolved Hide resolved

if (options?.signal) {
// Not validating as we already validated before
addAbortSignalNoValidate(
options.signal,
composedStream
);
}

return composedStream;
}

function map(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
Expand Down Expand Up @@ -392,6 +423,7 @@ module.exports.streamReturningOperators = {
flatMap,
map,
take,
compose,
};

module.exports.promiseReturningOperators = {
Expand Down
127 changes: 127 additions & 0 deletions test/parallel/test-stream-compose-operator.js
@@ -0,0 +1,127 @@
'use strict';

const common = require('../common');
const {
Readable, Transform,
} = require('stream');
const assert = require('assert');

{
// with async generator
const stream = Readable.from(['a', 'b', 'c', 'd']).compose(async function *(stream) {
let str = '';
for await (const chunk of stream) {
str += chunk;

if (str.length === 2) {
yield str;
str = '';
}
}
});
const result = ['ab', 'cd'];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
}
})().then(common.mustCall());
}

{
// With Transformer
const stream = Readable.from(['a', 'b', 'c', 'd']).compose(new Transform({
objectMode: true,
transform: common.mustCall((chunk, encoding, callback) => {
callback(null, chunk);
}, 4)
}));
const result = ['a', 'b', 'c', 'd'];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
}
})().then(common.mustCall());
}

{
// Throwing an error during `compose` (before waiting for data)
const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) { // eslint-disable-line require-yield

throw new Error('boom');
});

assert.rejects(async () => {
for await (const item of stream) {
assert.fail('should not reach here, got ' + item);
}
}, /boom/).then(common.mustCall());
}

{
// Throwing an error during `compose` (when waiting for data)
const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) {
for await (const chunk of stream) {
if (chunk === 3) {
throw new Error('boom');
}
yield chunk;
}
});

assert.rejects(
stream.toArray(),
/boom/,
).then(common.mustCall());
}

{
// Throwing an error during `compose` (after finishing all readable data)
const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) { // eslint-disable-line require-yield

// eslint-disable-next-line no-unused-vars,no-empty
for await (const chunk of stream) {
}

throw new Error('boom');
});
assert.rejects(
stream.toArray(),
/boom/,
).then(common.mustCall());
}

{
// AbortSignal
const ac = new AbortController();
const stream = Readable.from([1, 2, 3, 4, 5])
.compose(async function *(source) {
// Should not reach here
for await (const chunk of source) {
yield chunk;
}
}, { signal: ac.signal });

ac.abort();

assert.rejects(async () => {
for await (const item of stream) {
assert.fail('should not reach here, got ' + item);
}
}, {
name: 'AbortError',
}).then(common.mustCall());
}

{
assert.throws(
() => Readable.from(['a']).compose(Readable.from(['b'])),
{ code: 'ERR_INVALID_ARG_VALUE' }
);
}

{
assert.throws(
() => Readable.from(['a']).compose(),
{ code: 'ERR_INVALID_ARG_TYPE' }
);
}
27 changes: 12 additions & 15 deletions test/parallel/test-stream-compose.js
Expand Up @@ -358,27 +358,24 @@ const assert = require('assert');
}

{
try {
compose();
} catch (err) {
assert.strictEqual(err.code, 'ERR_MISSING_ARGS');
}
assert.throws(
benjamingr marked this conversation as resolved.
Show resolved Hide resolved
() => compose(),
{ code: 'ERR_MISSING_ARGS' }
);
}

{
try {
compose(new Writable(), new PassThrough());
} catch (err) {
assert.strictEqual(err.code, 'ERR_INVALID_ARG_VALUE');
}
assert.throws(
() => compose(new Writable(), new PassThrough()),
{ code: 'ERR_INVALID_ARG_VALUE' }
);
}

{
try {
compose(new PassThrough(), new Readable({ read() {} }), new PassThrough());
} catch (err) {
assert.strictEqual(err.code, 'ERR_INVALID_ARG_VALUE');
}
assert.throws(
() => compose(new PassThrough(), new Readable({ read() {} }), new PassThrough()),
{ code: 'ERR_INVALID_ARG_VALUE' }
);
}

{
Expand Down