From f300f197da710c1b18a0693c6119cb3ffc7f29ca Mon Sep 17 00:00:00 2001 From: Daeyeon Jeong Date: Sun, 2 Oct 2022 19:14:57 +0900 Subject: [PATCH] stream: handle enqueuing chunks when a pending BYOB pull request exists Signed-off-by: Daeyeon Jeong PR-URL: https://github.com/nodejs/node/pull/44770 Refs: https://streams.spec.whatwg.org/#readable-byte-stream-controller-enqueue Reviewed-By: Benjamin Gruenbaum Reviewed-By: Matteo Collina --- lib/internal/webstreams/readablestream.js | 72 ++++++++++++++++++----- test/wpt/status/streams.json | 11 ---- 2 files changed, 58 insertions(+), 25 deletions(-) diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 51e6ca149b1a88..ade543c957dd5a 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -2647,13 +2647,22 @@ function readableByteStreamControllerEnqueue(controller, chunk) { ); } - firstPendingPullInto.buffer = - transferArrayBuffer(firstPendingPullInto.buffer); - } + readableByteStreamControllerInvalidateBYOBRequest(controller); - readableByteStreamControllerInvalidateBYOBRequest(controller); + firstPendingPullInto.buffer = transferArrayBuffer( + firstPendingPullInto.buffer + ); + + if (firstPendingPullInto.type === 'none') { + readableByteStreamControllerEnqueueDetachedPullIntoToQueue( + controller, + firstPendingPullInto + ); + } + } if (readableStreamHasDefaultReader(stream)) { + readableByteStreamControllerProcessReadRequestsUsingQueue(controller); if (!readableStreamGetNumReadRequests(stream)) { readableByteStreamControllerEnqueueChunkToQueue( controller, @@ -2662,6 +2671,10 @@ function readableByteStreamControllerEnqueue(controller, chunk) { byteLength); } else { assert(!queue.length); + if (pendingPullIntos.length) { + assert(pendingPullIntos[0].type === 'default'); + readableByteStreamControllerShiftPendingPullInto(controller); + } const transferredView = new Uint8Array(transferredBuffer, byteOffset, byteLength); readableStreamFulfillReadRequest(stream, transferredView, false); @@ -2984,25 +2997,56 @@ function readableByteStreamControllerCancelSteps(controller, reason) { return result; } +function readableByteStreamControllerFillReadRequestFromQueue(controller, readRequest) { + const { + queue, + queueTotalSize, + } = controller[kState]; + assert(queueTotalSize > 0); + const { + buffer, + byteOffset, + byteLength, + } = ArrayPrototypeShift(queue); + + controller[kState].queueTotalSize -= byteLength; + readableByteStreamControllerHandleQueueDrain(controller); + const view = new Uint8Array(buffer, byteOffset, byteLength); + readRequest[kChunk](view); +} + +function readableByteStreamControllerProcessReadRequestsUsingQueue(controller) { + const { + stream, + queueTotalSize, + } = controller[kState]; + const { reader } = stream[kState]; + assert(isReadableStreamDefaultReader(reader)); + + while (reader[kState].readRequests.length > 0) { + if (queueTotalSize === 0) { + return; + } + readableByteStreamControllerFillReadRequestFromQueue( + controller, + ArrayPrototypeShift(reader[kState].readRequests), + ); + } +} + function readableByteStreamControllerPullSteps(controller, readRequest) { const { pendingPullIntos, - queue, queueTotalSize, stream, } = controller[kState]; assert(readableStreamHasDefaultReader(stream)); if (queueTotalSize) { assert(!readableStreamGetNumReadRequests(stream)); - const { - buffer, - byteOffset, - byteLength, - } = ArrayPrototypeShift(queue); - controller[kState].queueTotalSize -= byteLength; - readableByteStreamControllerHandleQueueDrain(controller); - const view = new Uint8Array(buffer, byteOffset, byteLength); - readRequest[kChunk](view); + readableByteStreamControllerFillReadRequestFromQueue( + controller, + readRequest + ); return; } const { diff --git a/test/wpt/status/streams.json b/test/wpt/status/streams.json index 2e8e931e697247..166bcf869080d2 100644 --- a/test/wpt/status/streams.json +++ b/test/wpt/status/streams.json @@ -12,17 +12,6 @@ ] } }, - "readable-byte-streams/general.any.js": { - "fail": { - "expected": [ - "ReadableStream with byte source: enqueue() discards auto-allocated BYOB request", - "ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, enqueue()", - "ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read() on second reader, enqueue()", - "ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read(view) on second reader, enqueue()", - "ReadableStream with byte source: read(view) with 1 element Uint16Array, respond(1), releaseLock(), read() on second reader, enqueue()" - ] - } - }, "readable-streams/cross-realm-crash.window.js": { "skip": "Browser-specific test" },