Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: enable usage of webstreams on compose() #46675

Merged
merged 12 commits into from
Feb 27, 2023
3 changes: 2 additions & 1 deletion doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -2802,7 +2802,8 @@ added: v16.9.0

> Stability: 1 - `stream.compose` is experimental.

* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]}
* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]|
ReadableStream\[]|WritableStream\[]|TransformStream\[]}
* Returns: {stream.Duplex}

Combines two or more streams into a `Duplex` stream that writes to the
Expand Down
191 changes: 138 additions & 53 deletions lib/internal/streams/compose.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ const {
isNodeStream,
isReadable,
isWritable,
isWebStream,
isTransformStream,
isWritableStream,
isReadableStream,
} = require('internal/streams/utils');
const {
AbortError,
Expand All @@ -15,6 +19,7 @@ const {
ERR_MISSING_ARGS,
},
} = require('internal/errors');
const eos = require('internal/streams/end-of-stream');

module.exports = function compose(...streams) {
if (streams.length === 0) {
Expand All @@ -37,18 +42,32 @@ module.exports = function compose(...streams) {
}

for (let n = 0; n < streams.length; ++n) {
if (!isNodeStream(streams[n])) {
if (!isNodeStream(streams[n]) && !isWebStream(streams[n])) {
// TODO(ronag): Add checks for non streams.
continue;
}
if (n < streams.length - 1 && !isReadable(streams[n])) {
if (
n < streams.length - 1 &&
!(
isReadable(streams[n]) ||
isReadableStream(streams[n]) ||
isTransformStream(streams[n])
)
) {
throw new ERR_INVALID_ARG_VALUE(
`streams[${n}]`,
orgStreams[n],
'must be readable'
);
}
if (n > 0 && !isWritable(streams[n])) {
if (
n > 0 &&
!(
isWritable(streams[n]) ||
isWritableStream(streams[n]) ||
isTransformStream(streams[n])
)
) {
throw new ERR_INVALID_ARG_VALUE(
`streams[${n}]`,
orgStreams[n],
Expand Down Expand Up @@ -79,8 +98,16 @@ module.exports = function compose(...streams) {
const head = streams[0];
const tail = pipeline(streams, onfinished);

const writable = !!isWritable(head);
const readable = !!isReadable(tail);
const writable = !!(
isWritable(head) ||
isWritableStream(head) ||
isTransformStream(head)
);
const readable = !!(
isReadable(tail) ||
isReadableStream(tail) ||
isTransformStream(tail)
);

// TODO(ronag): Avoid double buffering.
// Implement Writable/Readable/Duplex traits.
Expand All @@ -94,63 +121,119 @@ module.exports = function compose(...streams) {
});

if (writable) {
d._write = function(chunk, encoding, callback) {
if (head.write(chunk, encoding)) {
callback();
} else {
ondrain = callback;
}
};

d._final = function(callback) {
head.end();
onfinish = callback;
};

head.on('drain', function() {
if (ondrain) {
const cb = ondrain;
ondrain = null;
cb();
}
});
if (isNodeStream(head)) {
d._write = function(chunk, encoding, callback) {
if (head.write(chunk, encoding)) {
callback();
} else {
ondrain = callback;
}
};

tail.on('finish', function() {
if (onfinish) {
const cb = onfinish;
onfinish = null;
cb();
}
});
d._final = function(callback) {
head.end();
onfinish = callback;
};

head.on('drain', function() {
if (ondrain) {
const cb = ondrain;
ondrain = null;
cb();
}
});
} else if (isWebStream(head)) {
const writable = isTransformStream(head) ? head.writable : head;
const writer = writable.getWriter();

d._write = async function(chunk, encoding, callback) {
try {
await writer.ready;
writer.write(chunk).catch(() => {});
benjamingr marked this conversation as resolved.
Show resolved Hide resolved
callback();
} catch (err) {
callback(err);
}
};

d._final = async function(callback) {
try {
await writer.ready;
writer.close().catch(() => {});
onfinish = callback;
} catch (err) {
callback(err);
}
};
}
if (isNodeStream(tail)) {
tail.on('finish', function() {
debadree25 marked this conversation as resolved.
Show resolved Hide resolved
if (onfinish) {
const cb = onfinish;
onfinish = null;
cb();
}
});
} else if (isWebStream(tail)) {
const readable = isTransformStream(tail) ? tail.readable : tail;
eos(readable, () => {
if (onfinish) {
const cb = onfinish;
onfinish = null;
cb();
}
});
}
}

if (readable) {
tail.on('readable', function() {
if (onreadable) {
const cb = onreadable;
onreadable = null;
cb();
}
});
if (isNodeStream(tail)) {
tail.on('readable', function() {
if (onreadable) {
const cb = onreadable;
onreadable = null;
cb();
}
});

tail.on('end', function() {
d.push(null);
});
tail.on('end', function() {
d.push(null);
});

d._read = function() {
while (true) {
const buf = tail.read();
d._read = function() {
while (true) {
const buf = tail.read();
if (buf === null) {
onreadable = d._read;
return;
}

if (buf === null) {
onreadable = d._read;
return;
if (!d.push(buf)) {
return;
}
}
};
} else if (isWebStream(tail)) {
const readable = isTransformStream(tail) ? tail.readable : tail;
const reader = readable.getReader();
d._read = async function() {
while (true) {
try {
const { value, done } = await reader.read();
if (done) {
benjamingr marked this conversation as resolved.
Show resolved Hide resolved
d.push(null);
return;
}

if (!d.push(buf)) {
return;
if (!d.push(value)) {
return;
}
} catch {
return;
}
}
}
};
};
}
}

d._destroy = function(err, callback) {
Expand All @@ -166,7 +249,9 @@ module.exports = function compose(...streams) {
callback(err);
} else {
onclose = callback;
destroyer(tail, err);
if (isNodeStream(tail)) {
destroyer(tail, err);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some help is needed here, how could we destroy webstreams here? or should we even?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on the question? (We can destroy web streams the question is what scenario do you specifically mean)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the pipeline encounters an error, it would call d.destroy, which in turn would destroy the last stream in the series the tail stream should the same happen for webstreams too I think we could do writableStream.abort() here.

Actually, i am a little confused why destroying the last stream is necessary 😅

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one question remain wdyt @benjamingr ?

}
}
};

Expand Down
3 changes: 2 additions & 1 deletion lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ function pipelineImpl(streams, callback, opts) {
throw new ERR_INVALID_RETURN_VALUE(
'Iterable, AsyncIterable or Stream', 'source', ret);
}
} else if (isIterable(stream) || isReadableNodeStream(stream)) {
} else if (isIterable(stream) || isReadableNodeStream(stream) || isTransformStream(stream)) {
ret = stream;
} else {
ret = Duplex.from(stream);
Expand Down Expand Up @@ -384,6 +384,7 @@ function pipelineImpl(streams, callback, opts) {
finishCount++;
pumpToWeb(ret, stream, finish, { end });
} else if (isTransformStream(ret)) {
finishCount++;
pumpToWeb(ret.readable, stream, finish, { end });
} else {
throw new ERR_INVALID_ARG_TYPE(
Expand Down