Skip to content

Commit

Permalink
stream: support async for stream impl functions
Browse files Browse the repository at this point in the history
PR-URL: #34416
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Anto Aravinth <anto.aravinth.cse@gmail.com>
  • Loading branch information
jasnell committed Aug 4, 2020
1 parent ca26eae commit 744a284
Show file tree
Hide file tree
Showing 4 changed files with 453 additions and 5 deletions.
67 changes: 65 additions & 2 deletions lib/_stream_transform.js
Expand Up @@ -107,8 +107,10 @@ function Transform(options) {
}

function final(cb) {
let called = false;
if (typeof this._flush === 'function' && !this.destroyed) {
this._flush((er, data) => {
const result = this._flush((er, data) => {
called = true;
if (er) {
if (cb) {
cb(er);
Expand All @@ -126,6 +128,33 @@ function final(cb) {
cb();
}
});
if (result !== undefined && result !== null) {
try {
const then = result.then;
if (typeof then === 'function') {
then.call(
result,
(data) => {
if (called)
return;
if (data != null)
this.push(data);
this.push(null);
if (cb)
process.nextTick(cb);
},
(err) => {
if (cb) {
process.nextTick(cb, err);
} else {
process.nextTick(() => this.destroy(err));
}
});
}
} catch (err) {
process.nextTick(() => this.destroy(err));
}
}
} else {
this.push(null);
if (cb) {
Expand All @@ -151,7 +180,9 @@ Transform.prototype._write = function(chunk, encoding, callback) {
const wState = this._writableState;
const length = rState.length;

this._transform(chunk, encoding, (err, val) => {
let called = false;
const result = this._transform(chunk, encoding, (err, val) => {
called = true;
if (err) {
callback(err);
return;
Expand All @@ -172,6 +203,38 @@ Transform.prototype._write = function(chunk, encoding, callback) {
this[kCallback] = callback;
}
});
if (result !== undefined && result != null) {
try {
const then = result.then;
if (typeof then === 'function') {
then.call(
result,
(val) => {
if (called)
return;

if (val != null) {
this.push(val);
}

if (
wState.ended ||
length === rState.length ||
rState.length < rState.highWaterMark ||
rState.length === 0) {
process.nextTick(callback);
} else {
this[kCallback] = callback;
}
},
(err) => {
process.nextTick(callback, err);
});
}
} catch (err) {
process.nextTick(callback, err);
}
}
};

Transform.prototype._read = function() {
Expand Down
27 changes: 26 additions & 1 deletion lib/_stream_writable.js
Expand Up @@ -647,7 +647,7 @@ function needFinish(state) {
function callFinal(stream, state) {
state.sync = true;
state.pendingcb++;
stream._final((err) => {
const result = stream._final((err) => {
state.pendingcb--;
if (err) {
for (const callback of state[kOnFinished].splice(0)) {
Expand All @@ -664,6 +664,31 @@ function callFinal(stream, state) {
process.nextTick(finish, stream, state);
}
});
if (result !== undefined && result !== null) {
try {
const then = result.then;
if (typeof then === 'function') {
then.call(
result,
function() {
if (state.prefinished)
return;
state.prefinish = true;
process.nextTick(() => stream.emit('prefinish'));
state.pendingcb++;
process.nextTick(finish, stream, state);
},
function(err) {
for (const callback of state[kOnFinished].splice(0)) {
process.nextTick(callback, err);
}
process.nextTick(errorOrDestroy, stream, err, state.sync);
});
}
} catch (err) {
process.nextTick(errorOrDestroy, stream, err, state.sync);
}
}
state.sync = false;
}

Expand Down
106 changes: 104 additions & 2 deletions lib/internal/streams/destroy.js
Expand Up @@ -59,10 +59,13 @@ function destroy(err, cb) {
}

function _destroy(self, err, cb) {
self._destroy(err || null, (err) => {
let called = false;
const result = self._destroy(err || null, (err) => {
const r = self._readableState;
const w = self._writableState;

called = true;

if (err) {
// Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
err.stack;
Expand Down Expand Up @@ -92,6 +95,64 @@ function _destroy(self, err, cb) {
process.nextTick(emitCloseNT, self);
}
});
if (result !== undefined && result !== null) {
try {
const then = result.then;
if (typeof then === 'function') {
then.call(
result,
function() {
if (called)
return;

const r = self._readableState;
const w = self._writableState;

if (w) {
w.closed = true;
}
if (r) {
r.closed = true;
}

if (typeof cb === 'function') {
process.nextTick(cb);
}

process.nextTick(emitCloseNT, self);
},
function(err) {
const r = self._readableState;
const w = self._writableState;
err.stack;

called = true;

if (w && !w.errored) {
w.errored = err;
}
if (r && !r.errored) {
r.errored = err;
}

if (w) {
w.closed = true;
}
if (r) {
r.closed = true;
}

if (typeof cb === 'function') {
process.nextTick(cb, err);
}

process.nextTick(emitErrorCloseNT, self, err);
});
}
} catch (err) {
process.nextTick(emitErrorNT, self, err);
}
}
}

function emitErrorCloseNT(self, err) {
Expand Down Expand Up @@ -230,7 +291,7 @@ function constructNT(stream) {
const s = w || r;

let called = false;
stream._construct((err) => {
const result = stream._construct((err) => {
if (r) {
r.constructed = true;
}
Expand All @@ -252,6 +313,47 @@ function constructNT(stream) {
process.nextTick(emitConstructNT, stream);
}
});
if (result !== undefined && result !== null) {
try {
const then = result.then;
if (typeof then === 'function') {
then.call(
result,
function() {
// If the callback was invoked, do nothing further.
if (called)
return;
if (r) {
r.constructed = true;
}
if (w) {
w.constructed = true;
}
if (s.destroyed) {
process.nextTick(() => stream.emit(kDestroy));
} else {
process.nextTick(emitConstructNT, stream);
}
},
function(err) {
if (r) {
r.constructed = true;
}
if (w) {
w.constructed = true;
}
called = true;
if (s.destroyed) {
process.nextTick(() => stream.emit(kDestroy, err));
} else {
process.nextTick(errorOrDestroy, stream, err);
}
});
}
} catch (err) {
process.nextTick(emitErrorNT, stream, err);
}
}
}

function emitConstructNT(stream) {
Expand Down

0 comments on commit 744a284

Please sign in to comment.