From 37b200c4b60d52246774b22d8b41d0e91b937cc3 Mon Sep 17 00:00:00 2001 From: RafaelGSS Date: Sat, 20 Nov 2021 23:44:38 -0300 Subject: [PATCH] streams: fix enqueue race condition on esm modules streams: use nextTick on close fix: lint --- lib/internal/webstreams/readablestream.js | 16 +++++---- test/parallel/test-whatwg-readablestream.mjs | 37 ++++++++++++++++++++ 2 files changed, 46 insertions(+), 7 deletions(-) create mode 100644 test/parallel/test-whatwg-readablestream.mjs diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index fe9b26b991f04e..b4d989febeca75 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -1424,13 +1424,15 @@ function readableStreamTee(stream, cloneForBranch2) { }); }, [kClose]() { - reading = false; - if (!canceled1) - readableStreamDefaultControllerClose(branch1[kState].controller); - if (!canceled2) - readableStreamDefaultControllerClose(branch2[kState].controller); - if (!canceled1 || !canceled2) - cancelPromise.resolve(); + process.nextTick(() => { + reading = false; + if (!canceled1) + readableStreamDefaultControllerClose(branch1[kState].controller); + if (!canceled2) + readableStreamDefaultControllerClose(branch2[kState].controller); + if (!canceled1 || !canceled2) + cancelPromise.resolve(); + }); }, [kError]() { reading = false; diff --git a/test/parallel/test-whatwg-readablestream.mjs b/test/parallel/test-whatwg-readablestream.mjs new file mode 100644 index 00000000000000..5f94a10914219a --- /dev/null +++ b/test/parallel/test-whatwg-readablestream.mjs @@ -0,0 +1,37 @@ +import { mustCall } from '../common/index.mjs'; +import { ReadableStream } from 'stream/web'; +import assert from 'assert'; + +{ + // Test tee() with close in the nextTick after enqueue + async function read(stream) { + const chunks = []; + for await (const chunk of stream) + chunks.push(chunk); + return Buffer.concat(chunks).toString(); + } + + const [r1, r2] = new ReadableStream({ + start(controller) { + process.nextTick(() => { + controller.enqueue(new Uint8Array([102, 111, 111, 98, 97, 114])); + + process.nextTick(() => { + controller.close(); + }); + }); + } + }).tee(); + + (async () => { + const [dataReader1, dataReader2] = await Promise.all([ + read(r1), + read(r2), + ]); + + assert.strictEqual(dataReader1, dataReader2); + assert.strictEqual(dataReader1, 'foobar'); + assert.strictEqual(dataReader2, 'foobar'); + })().then(mustCall()); +} +