Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: implement finished() for ReadableStream and WritableStream #46205

Merged
merged 16 commits into from Jan 18, 2023
Merged
25 changes: 20 additions & 5 deletions lib/internal/streams/end-of-stream.js
Expand Up @@ -22,20 +22,23 @@ const {
validateBoolean
} = require('internal/validators');

const { Promise } = primordials;
const { Promise, PromisePrototypeThen } = primordials;

const {
isClosed,
isReadable,
isReadableNodeStream,
isReadableStream,
isReadableFinished,
isReadableErrored,
isWritable,
isWritableNodeStream,
isWritableStream,
isWritableFinished,
isWritableErrored,
isNodeStream,
willEmitClose: _willEmitClose,
kIsClosedPromise,
} = require('internal/streams/utils');

function isRequest(stream) {
Expand All @@ -58,14 +61,17 @@ function eos(stream, options, callback) {

callback = once(callback);

const readable = options.readable ?? isReadableNodeStream(stream);
const writable = options.writable ?? isWritableNodeStream(stream);
if (isReadableStream(stream) || isWritableStream(stream)) {
return eosWeb(stream, options, callback);
}
Comment on lines +64 to +66
Copy link
Contributor

@kanongil kanongil Apr 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These match non-node WebStreams – which I guess is fine, except that eosWeb() will throw a TypeError since the private property kIsClosedPromise will not exist on it. This will in turn mean that the callback is never called.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As in you want eosWeb to additionally have a check to throw TypeError ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what node should do – probably still throw some kind of error.

Currently non-node WebStreams would just throw a TypeError from accessing the promise property on the undefined kIsClosedPromise property here:

stream[kIsClosedPromise].promise,


if (!isNodeStream(stream)) {
ronag marked this conversation as resolved.
Show resolved Hide resolved
// TODO: Webstreams.
throw new ERR_INVALID_ARG_TYPE('stream', 'Stream', stream);
throw new ERR_INVALID_ARG_TYPE('stream', ['ReadableStream', 'WritableStream', 'Stream'], stream);
}

const readable = options.readable ?? isReadableNodeStream(stream);
const writable = options.writable ?? isWritableNodeStream(stream);

const wState = stream._writableState;
const rState = stream._readableState;

Expand Down Expand Up @@ -255,6 +261,15 @@ function eos(stream, options, callback) {
return cleanup;
}

function eosWeb(stream, opts, callback) {
RaisinTen marked this conversation as resolved.
Show resolved Hide resolved
PromisePrototypeThen(
stream[kIsClosedPromise].promise,
() => process.nextTick(() => callback.call(stream)),
(err) => process.nextTick(() => callback.call(stream, err)),
);
return nop;
}

function finished(stream, opts) {
let autoCleanup = false;
if (opts === null) {
Expand Down
25 changes: 25 additions & 0 deletions lib/internal/streams/utils.js
Expand Up @@ -4,13 +4,16 @@ const {
Symbol,
SymbolAsyncIterator,
SymbolIterator,
SymbolFor,
} = primordials;

const kDestroyed = Symbol('kDestroyed');
const kIsErrored = Symbol('kIsErrored');
const kIsReadable = Symbol('kIsReadable');
const kIsDisturbed = Symbol('kIsDisturbed');

const kIsClosedPromise = SymbolFor('nodejs.webstream.isClosedPromise');

function isReadableNodeStream(obj, strict = false) {
return !!(
obj &&
Expand Down Expand Up @@ -55,6 +58,25 @@ function isNodeStream(obj) {
);
}

function isReadableStream(obj) {
return !!(
obj &&
!isNodeStream(obj) &&
typeof obj.pipeThrough === 'function' &&
typeof obj.getReader === 'function' &&
typeof obj.cancel === 'function'
);
}

function isWritableStream(obj) {
return !!(
obj &&
!isNodeStream(obj) &&
typeof obj.getWriter === 'function' &&
typeof obj.abort === 'function'
);
}

function isIterable(obj, isAsync) {
if (obj == null) return false;
if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function';
Expand Down Expand Up @@ -269,18 +291,21 @@ module.exports = {
kIsErrored,
isReadable,
kIsReadable,
kIsClosedPromise,
isClosed,
isDestroyed,
isDuplexNodeStream,
isFinished,
isIterable,
isReadableNodeStream,
isReadableStream,
isReadableEnded,
isReadableFinished,
isReadableErrored,
isNodeStream,
isWritable,
isWritableNodeStream,
isWritableStream,
isWritableEnded,
isWritableFinished,
isWritableErrored,
Expand Down
14 changes: 11 additions & 3 deletions lib/internal/webstreams/readablestream.js
Expand Up @@ -84,6 +84,7 @@ const {
kIsDisturbed,
kIsErrored,
kIsReadable,
kIsClosedPromise,
} = require('internal/streams/utils');

const {
Expand Down Expand Up @@ -231,9 +232,11 @@ class ReadableStream {
port1: undefined,
port2: undefined,
promise: undefined,
}
},
};

this[kIsClosedPromise] = createDeferredPromise();

// The spec requires handling of the strategy first
// here. Specifically, if getting the size and
// highWaterMark from the strategy fail, that has
Expand Down Expand Up @@ -625,8 +628,9 @@ function TransferredReadableStream() {
writable: undefined,
port: undefined,
promise: undefined,
}
},
};
this[kIsClosedPromise] = createDeferredPromise();
},
[], ReadableStream));
}
Expand Down Expand Up @@ -1195,8 +1199,9 @@ function createTeeReadableStream(start, pull, cancel) {
writable: undefined,
port: undefined,
promise: undefined,
}
},
};
this[kIsClosedPromise] = createDeferredPromise();
setupReadableStreamDefaultControllerFromSource(
this,
ObjectCreate(null, {
Expand Down Expand Up @@ -1869,6 +1874,7 @@ function readableStreamCancel(stream, reason) {
function readableStreamClose(stream) {
assert(stream[kState].state === 'readable');
stream[kState].state = 'closed';
stream[kIsClosedPromise].resolve();

const {
reader,
Expand All @@ -1890,6 +1896,8 @@ function readableStreamError(stream, error) {
assert(stream[kState].state === 'readable');
stream[kState].state = 'errored';
stream[kState].storedError = error;
stream[kIsClosedPromise].reject(error);
setPromiseHandled(stream[kIsClosedPromise].promise);

const {
reader
Expand Down
14 changes: 13 additions & 1 deletion lib/internal/webstreams/writablestream.js
Expand Up @@ -67,6 +67,10 @@ const {
kState,
} = require('internal/webstreams/util');

const {
kIsClosedPromise,
} = require('internal/streams/utils');

const {
AbortController,
} = require('internal/abort_controller');
Expand Down Expand Up @@ -175,9 +179,11 @@ class WritableStream {
port1: undefined,
port2: undefined,
promise: undefined,
}
},
};

this[kIsClosedPromise] = createDeferredPromise();

const size = extractSizeAlgorithm(strategy?.size);
const highWaterMark = extractHighWaterMark(strategy?.highWaterMark, 1);

Expand Down Expand Up @@ -347,6 +353,7 @@ function TransferredWritableStream() {
readable: undefined,
},
};
this[kIsClosedPromise] = createDeferredPromise();
},
[], WritableStream));
}
Expand Down Expand Up @@ -726,6 +733,10 @@ function writableStreamRejectCloseAndClosedPromiseIfNeeded(stream) {
resolve: undefined,
};
}

stream[kIsClosedPromise].reject(stream[kState]?.storedError);
setPromiseHandled(stream[kIsClosedPromise].promise);

const {
writer,
} = stream[kState];
Expand Down Expand Up @@ -839,6 +850,7 @@ function writableStreamFinishInFlightClose(stream) {
stream[kState].state = 'closed';
if (stream[kState].writer !== undefined)
stream[kState].writer[kState].close.resolve?.();
stream[kIsClosedPromise].resolve?.();
assert(stream[kState].pendingAbortRequest.abort.promise === undefined);
assert(stream[kState].storedError === undefined);
}
Expand Down