Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: add compose operator #44937

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 26 additions & 0 deletions lib/internal/streams/operators.js
Expand Up @@ -17,6 +17,10 @@ const {
} = require('internal/validators');
const { kWeakHandler } = require('internal/event_target');
const { finished } = require('internal/streams/end-of-stream');
const staticCompose = require('internal/streams/compose');
const {
addAbortSignalNoValidate,
} = require('internal/streams/add-abort-signal');

const {
ArrayPrototypePush,
Expand All @@ -32,6 +36,27 @@ const {
const kEmpty = Symbol('kEmpty');
const kEof = Symbol('kEof');

function compose(stream, options) {
if (options != null) {
validateObject(options, 'options');
}
if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
}
rluvaton marked this conversation as resolved.
Show resolved Hide resolved

const composedStream = staticCompose(this, stream);
rluvaton marked this conversation as resolved.
Show resolved Hide resolved

if (options?.signal) {
// Not validating as we already validated before
addAbortSignalNoValidate(
options.signal,
composedStream
);
}

return composedStream;
}

function map(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
Expand Down Expand Up @@ -392,6 +417,7 @@ module.exports.streamReturningOperators = {
flatMap,
map,
take,
compose,
};

module.exports.promiseReturningOperators = {
Expand Down
116 changes: 116 additions & 0 deletions test/parallel/test-stream-compose-operator.js
@@ -0,0 +1,116 @@
'use strict';

const common = require('../common');
const {
Readable, Transform,
} = require('stream');
const assert = require('assert');

{
// with async generator
const stream = Readable.from(['a', 'b', 'c', 'd']).compose(async function *(stream) {
let str = '';
for await (const chunk of stream) {
str += chunk;

if (str.length === 2) {
yield str;
str = '';
}
}
});
const result = ['ab', 'cd'];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
}
})().then(common.mustCall());
}

{
// With Transformer
const stream = Readable.from(['a', 'b', 'c', 'd']).compose(new Transform({
objectMode: true,
transform: common.mustCall((chunk, encoding, callback) => {
callback(null, chunk);
}, 4)
}));
const result = ['a', 'b', 'c', 'd'];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
}
})().then(common.mustCall());
}

{
// Throwing an error during `compose` (before waiting for data)
const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) { // eslint-disable-line require-yield

throw new Error('boom');
});

assert.rejects(async () => {
for await (const item of stream) {
assert.fail('should not reach here, got ' + item);
}
}, /boom/).then(common.mustCall());
}

{
// Throwing an error during `compose` (when waiting for data)
const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) {
for await (const chunk of stream) {
if (chunk === 3) {
throw new Error('boom');
}
yield chunk;
}
});

assert.rejects(
stream.toArray(),
/boom/,
).then(common.mustCall());
}

{
// Throwing an error during `compose` (after finishing all readable data)
const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) { // eslint-disable-line require-yield

// eslint-disable-next-line no-unused-vars,no-empty
for await (const chunk of stream) {
}

throw new Error('boom');
});
assert.rejects(
stream.toArray(),
/boom/,
).then(common.mustCall());
}

{
// AbortSignal
const ac = new AbortController();
const stream = Readable.from([1, 2, 3, 4, 5])
.compose(async function *(source) {
// Should not reach here
for await (const chunk of source) {
yield chunk;
}
}, { signal: ac.signal });

ac.abort();

assert.rejects(async () => {
for await (const item of stream) {
assert.fail('should not reach here, got ' + item);
}
}, {
name: 'AbortError',
}).then(common.mustCall());
}


// TODO - add tests for argument validation