diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index 08c196802780b8..b6e744250799c6 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -6,11 +6,13 @@ const { } = primordials; function isReadable(obj) { - return !!(obj && typeof obj.pipe === 'function'); + return !!(obj && typeof obj.pipe === 'function' && + typeof obj.on === 'function'); } function isWritable(obj) { - return !!(obj && typeof obj.write === 'function'); + return !!(obj && typeof obj.write === 'function' && + typeof obj.on === 'function'); } function isStream(obj) { diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 7940d3a90d1828..ed5a3d9a0b54f4 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1371,3 +1371,19 @@ const net = require('net'); assert.strictEqual(res, '123'); })); } + +{ + const content = 'abc'; + pipeline(Buffer.from(content), PassThrough({ objectMode: true }), + common.mustSucceed(() => {})); + + let res = ''; + pipeline(Buffer.from(content), async function*(previous) { + for await (const val of previous) { + res += String.fromCharCode(val); + yield val; + } + }, common.mustSucceed(() => { + assert.strictEqual(res, content); + })); +}