diff --git a/lib/internal/streams/duplexify.js b/lib/internal/streams/duplexify.js index cbcbf1fd69cc3b..813da0f8ef52b8 100644 --- a/lib/internal/streams/duplexify.js +++ b/lib/internal/streams/duplexify.js @@ -210,11 +210,13 @@ function fromAsyncGen(fn) { const value = fn(async function*() { while (true) { const { chunk, done, cb } = await promise; + promise = null; + resolve = null; process.nextTick(cb); if (done) return; if (signal.aborted) throw new AbortError(); - yield chunk; ({ promise, resolve } = createDeferredPromise()); + yield chunk; } }(), { signal }); diff --git a/test/parallel/test-stream-duplex-from.js b/test/parallel/test-stream-duplex-from.js index 265b61dfd062f9..51e739262f1f2c 100644 --- a/test/parallel/test-stream-duplex-from.js +++ b/test/parallel/test-stream-duplex-from.js @@ -2,7 +2,7 @@ const common = require('../common'); const assert = require('assert'); -const { Duplex, Readable, Writable } = require('stream'); +const { Duplex, Readable, Writable, pipeline } = require('stream'); { const d = Duplex.from({ @@ -118,3 +118,34 @@ const { Duplex, Readable, Writable } = require('stream'); assert.strictEqual(d.readable, false); })); } + +{ + // https://github.com/nodejs/node/issues/40497 + pipeline( + ['abc\ndef\nghi'], + Duplex.from(async function * (source) { + let rest = '' + for await (const chunk of source) { + console.error(0) + const lines = (rest + chunk.toString()).split('\n') + rest = lines.pop() + for (const line of lines) { + console.error(1, line) + yield line + console.error(2) + } + console.error(3) + } + console.error(4) + yield rest + }), + async function * (source) { + let ret = '' + for await (const x of source) { + ret += x + } + assert.strictEqual(ret, 'abcdefghi') + }, + common.mustCall(() => {}), + ) +}