diff --git a/lib/internal/streams/duplexify.js b/lib/internal/streams/duplexify.js index bfcd51233ac786..68b47f1e8ce9cc 100644 --- a/lib/internal/streams/duplexify.js +++ b/lib/internal/streams/duplexify.js @@ -20,6 +20,7 @@ const { const { destroyer } = require('internal/streams/destroy'); const Duplex = require('internal/streams/duplex'); const Readable = require('internal/streams/readable'); +const Writable = require('internal/streams/writable'); const { createDeferredPromise } = require('internal/util'); const from = require('internal/streams/from'); @@ -32,6 +33,16 @@ const { FunctionPrototypeCall } = primordials; + +const { + isBrandCheck, +} = require('internal/webstreams/util'); + +const isReadableStream = + isBrandCheck('ReadableStream'); +const isWritableStream = + isBrandCheck('WritableStream'); + // This is needed for pre node 17. class Duplexify extends Duplex { constructor(options) { @@ -71,15 +82,13 @@ module.exports = function duplexify(body, name) { return _duplexify({ writable: false, readable: false }); } - // TODO: Webstreams - // if (isReadableStream(body)) { - // return _duplexify({ readable: Readable.fromWeb(body) }); - // } + if (isReadableStream(body)) { + return _duplexify({ readable: Readable.fromWeb(body) }); + } - // TODO: Webstreams - // if (isWritableStream(body)) { - // return _duplexify({ writable: Writable.fromWeb(body) }); - // } + if (isWritableStream(body)) { + return _duplexify({ writable: Writable.fromWeb(body) }); + } if (typeof body === 'function') { const { value, write, final, destroy } = fromAsyncGen(body); @@ -146,13 +155,12 @@ module.exports = function duplexify(body, name) { }); } - // TODO: Webstreams. - // if ( - // isReadableStream(body?.readable) && - // isWritableStream(body?.writable) - // ) { - // return Duplexify.fromWeb(body); - // } + if ( + isReadableStream(body?.readable) && + isWritableStream(body?.writable) + ) { + return Duplexify.fromWeb(body); + } if ( typeof body?.writable === 'object' || diff --git a/test/parallel/test-stream-duplex-from.js b/test/parallel/test-stream-duplex-from.js index c3f3dd756b2e66..e3c117ff8dedb0 100644 --- a/test/parallel/test-stream-duplex-from.js +++ b/test/parallel/test-stream-duplex-from.js @@ -3,6 +3,7 @@ const common = require('../common'); const assert = require('assert'); const { Duplex, Readable, Writable, pipeline, PassThrough } = require('stream'); +const { ReadableStream, WritableStream } = require('stream/web'); const { Blob } = require('buffer'); { @@ -299,3 +300,104 @@ const { Blob } = require('buffer'); assert.strictEqual(res, 'foobar'); })).on('close', common.mustCall()); } + +function makeATestReadableStream(value) { + return new ReadableStream({ + start(controller) { + controller.enqueue(value); + controller.close(); + } + }); +} + +function makeATestWritableStream(writeFunc) { + return new WritableStream({ + write(chunk) { + writeFunc(chunk); + } + }); +} + +{ + const d = Duplex.from({ + readable: makeATestReadableStream('foo'), + }); + assert.strictEqual(d.readable, true); + assert.strictEqual(d.writable, false); + + d.on('data', common.mustCall((data) => { + assert.strictEqual(data.toString(), 'foo'); + })); + + d.on('end', common.mustCall(() => { + assert.strictEqual(d.readable, false); + })); +} + +{ + const d = Duplex.from(makeATestReadableStream('foo')); + + assert.strictEqual(d.readable, true); + assert.strictEqual(d.writable, false); + + d.on('data', common.mustCall((data) => { + assert.strictEqual(data.toString(), 'foo'); + })); + + d.on('end', common.mustCall(() => { + assert.strictEqual(d.readable, false); + })); +} + +{ + let ret = ''; + const d = Duplex.from({ + writable: makeATestWritableStream((chunk) => ret += chunk), + }); + + assert.strictEqual(d.readable, false); + assert.strictEqual(d.writable, true); + + d.end('foo'); + d.on('finish', common.mustCall(() => { + assert.strictEqual(ret, 'foo'); + assert.strictEqual(d.writable, false); + })); +} + +{ + let ret = ''; + const d = Duplex.from(makeATestWritableStream((chunk) => ret += chunk)); + + assert.strictEqual(d.readable, false); + assert.strictEqual(d.writable, true); + + d.end('foo'); + d.on('finish', common.mustCall(() => { + assert.strictEqual(ret, 'foo'); + assert.strictEqual(d.writable, false); + })); +} + +{ + let ret = ''; + const d = Duplex.from({ + readable: makeATestReadableStream('foo'), + writable: makeATestWritableStream((chunk) => ret += chunk), + }); + + d.end('bar'); + + d.on('data', common.mustCall((data) => { + assert.strictEqual(data.toString(), 'foo'); + })); + + d.on('end', common.mustCall(() => { + assert.strictEqual(d.readable, false); + })); + + d.on('finish', common.mustCall(() => { + assert.strictEqual(ret, 'bar'); + assert.strictEqual(d.writable, false); + })); +}