From 06e5c0ee39bf96bbda057a3cce900dccd4af3ff0 Mon Sep 17 00:00:00 2001 From: Xuguang Mei Date: Thu, 3 Mar 2022 12:53:59 +0800 Subject: [PATCH] stream: use .chunk when calling adapters's writev Fix: https://github.com/nodejs/node/issues/42157 PR-URL: https://github.com/nodejs/node/pull/42161 Fixes: https://github.com/nodejs/node/issues/42157 Reviewed-By: Benjamin Gruenbaum Reviewed-By: Matteo Collina --- lib/internal/webstreams/adapters.js | 10 ++++++---- ...hatwg-webstreams-adapters-to-streamduplex.js | 17 +++++++++++++++++ ...twg-webstreams-adapters-to-streamwritable.js | 5 ++++- 3 files changed, 27 insertions(+), 5 deletions(-) diff --git a/lib/internal/webstreams/adapters.js b/lib/internal/webstreams/adapters.js index a81c173e4714e9..1058c1c0356ead 100644 --- a/lib/internal/webstreams/adapters.js +++ b/lib/internal/webstreams/adapters.js @@ -228,8 +228,9 @@ function newStreamWritableFromWritableStream(writableStream, options = {}) { writev(chunks, callback) { function done(error) { + error = error.filter((e) => e); try { - callback(error); + callback(error.length === 0 ? undefined : error); } catch (error) { // In a next tick because this is happening within // a promise context, and if there are any errors @@ -247,7 +248,7 @@ function newStreamWritableFromWritableStream(writableStream, options = {}) { PromiseAll( ArrayPrototypeMap( chunks, - (chunk) => writer.write(chunk))), + (data) => writer.write(data.chunk))), done, done); }, @@ -633,8 +634,9 @@ function newStreamDuplexFromReadableWritablePair(pair = {}, options = {}) { writev(chunks, callback) { function done(error) { + error = error.filter((e) => e); try { - callback(error); + callback(error.length === 0 ? undefined : error); } catch (error) { // In a next tick because this is happening within // a promise context, and if there are any errors @@ -652,7 +654,7 @@ function newStreamDuplexFromReadableWritablePair(pair = {}, options = {}) { PromiseAll( ArrayPrototypeMap( chunks, - (chunk) => writer.write(chunk))), + (data) => writer.write(data.chunk))), done, done); }, diff --git a/test/parallel/test-whatwg-webstreams-adapters-to-streamduplex.js b/test/parallel/test-whatwg-webstreams-adapters-to-streamduplex.js index 4567c53c1e89a0..15ac9f832714e9 100644 --- a/test/parallel/test-whatwg-webstreams-adapters-to-streamduplex.js +++ b/test/parallel/test-whatwg-webstreams-adapters-to-streamduplex.js @@ -147,3 +147,20 @@ const { finished(duplex, common.mustCall()); pipeline(readable, duplex, writable, common.mustCall()); } + +{ + const transform = new TransformStream(); + const duplex = newStreamDuplexFromReadableWritablePair(transform); + duplex.setEncoding('utf-8'); + duplex.on('data', common.mustCall((data) => { + assert.strictEqual(data, 'hello'); + }, 5)); + + duplex.write(Buffer.from('hello')); + duplex.write(Buffer.from('hello')); + duplex.write(Buffer.from('hello')); + duplex.write(Buffer.from('hello')); + duplex.write(Buffer.from('hello')); + + duplex.end(); +} diff --git a/test/parallel/test-whatwg-webstreams-adapters-to-streamwritable.js b/test/parallel/test-whatwg-webstreams-adapters-to-streamwritable.js index 8da73b4fe9f0c3..495eef73f79272 100644 --- a/test/parallel/test-whatwg-webstreams-adapters-to-streamwritable.js +++ b/test/parallel/test-whatwg-webstreams-adapters-to-streamwritable.js @@ -200,7 +200,7 @@ class TestSource { { const writableStream = new WritableStream({ - write: common.mustCall(2), + write: common.mustCall(5), close: common.mustCall(), }); const writable = newStreamWritableFromWritableStream(writableStream); @@ -208,6 +208,9 @@ class TestSource { finished(writable, common.mustCall()); writable.write('hello'); + writable.write('hello'); + writable.write('hello'); + writable.write('world'); writable.write('world'); writable.end(); }