Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: use callback to properly propagate error #29179

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 9 additions & 6 deletions lib/_stream_readable.js
Expand Up @@ -144,7 +144,10 @@ function ReadableState(options, stream, isDuplex) {
// Has it been destroyed
this.destroyed = false;

// Indicates whether the stream has errored.
// Indicates whether the stream has errored. When true no further
// _read calls, 'data' or 'readable' events should occur. This is needed
// since when autoDestroy is disabled we need a way to tell whether the
// stream has failed.
this.errored = false;
ronag marked this conversation as resolved.
Show resolved Hide resolved

// Indicates whether the stream has finished destroying.
Expand Down Expand Up @@ -258,7 +261,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
addChunk(stream, state, chunk, true);
} else if (state.ended) {
errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
} else if (state.destroyed) {
} else if (state.destroyed || state.errored) {
return false;
} else {
state.reading = false;
Expand Down Expand Up @@ -453,9 +456,9 @@ Readable.prototype.read = function(n) {
}

// However, if we've ended, then there's no point, if we're already
// reading, then it's unnecessary, and if we're destroyed, then it's
// not allowed.
if (state.ended || state.reading || state.destroyed) {
// reading, then it's unnecessary, and if we're destroyed or errored,
// then it's not allowed.
if (state.ended || state.reading || state.destroyed || state.errored) {
doRead = false;
debug('reading or ended', doRead);
} else if (doRead) {
Expand Down Expand Up @@ -553,7 +556,7 @@ function emitReadable(stream) {
function emitReadable_(stream) {
const state = stream._readableState;
debug('emitReadable_', state.destroyed, state.length, state.ended);
if (!state.destroyed && (state.length || state.ended)) {
if (!state.destroyed && !state.errored && (state.length || state.ended)) {
stream.emit('readable');
state.emittedReadable = false;
}
Expand Down
7 changes: 7 additions & 0 deletions lib/_stream_writable.js
Expand Up @@ -416,6 +416,13 @@ function onwrite(stream, er) {

if (er) {
state.errored = true;

// In case of duplex streams we need to notify the readable side of the
// error.
if (stream._readableState) {
stream._readableState.errored = true;
}

if (sync) {
process.nextTick(onwriteError, stream, state, er, cb);
} else {
Expand Down
13 changes: 11 additions & 2 deletions lib/internal/http2/core.js
Expand Up @@ -1995,10 +1995,19 @@ class Http2Stream extends Duplex {

let req;

// writeGeneric does not destroy on error and we cannot enable autoDestroy,
// so make sure to destroy on error.
const callback = (err) => {
if (err) {
this.destroy(err);
}
cb(err);
};

if (writev)
req = writevGeneric(this, data, cb);
req = writevGeneric(this, data, callback);
else
req = writeGeneric(this, data, encoding, cb);
req = writeGeneric(this, data, encoding, callback);
ronag marked this conversation as resolved.
Show resolved Hide resolved

trackWriteState(this, req.bytes);
}
Expand Down
16 changes: 10 additions & 6 deletions lib/internal/stream_base_commons.js
Expand Up @@ -88,9 +88,14 @@ function onWriteComplete(status) {
return;
}

// TODO (ronag): This should be moved before if(stream.destroyed)
// in order to avoid swallowing error.
if (status < 0) {
const ex = errnoException(status, 'write', this.error);
stream.destroy(ex, this.callback);
if (typeof this.callback === 'function')
this.callback(ex);
else
stream.destroy(ex);
ronag marked this conversation as resolved.
Show resolved Hide resolved
return;
}

Expand Down Expand Up @@ -134,24 +139,24 @@ function writevGeneric(self, data, cb) {
// Retain chunks
if (err === 0) req._chunks = chunks;

afterWriteDispatched(self, req, err, cb);
afterWriteDispatched(req, err, cb);
return req;
}

function writeGeneric(self, data, encoding, cb) {
const req = createWriteWrap(self[kHandle]);
const err = handleWriteReq(req, data, encoding);

afterWriteDispatched(self, req, err, cb);
afterWriteDispatched(req, err, cb);
return req;
}

function afterWriteDispatched(self, req, err, cb) {
function afterWriteDispatched(req, err, cb) {
req.bytes = streamBaseState[kBytesWritten];
req.async = !!streamBaseState[kLastWriteWasAsync];

if (err !== 0)
return self.destroy(errnoException(err, 'write', req.error), cb);
return cb(errnoException(err, 'write', req.error));
mcollina marked this conversation as resolved.
Show resolved Hide resolved

if (!req.async) {
cb();
Expand Down Expand Up @@ -264,7 +269,6 @@ function setStreamTimeout(msecs, callback) {
}

module.exports = {
createWriteWrap,
writevGeneric,
writeGeneric,
onStreamRead,
Expand Down
56 changes: 56 additions & 0 deletions test/parallel/test-net-connect-buffer2.js
@@ -0,0 +1,56 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const net = require('net');

const tcp = net.Server(common.mustCall((s) => {
tcp.close();

let buf = '';
s.setEncoding('utf8');
s.on('data', function(d) {
buf += d;
});

s.on('end', common.mustCall(function() {
console.error('SERVER: end', buf);
assert.strictEqual(buf, "L'État, c'est moi");
s.end();
}));
}));

tcp.listen(0, common.mustCall(function() {
const socket = net.Stream({ highWaterMark: 0 });

let connected = false;
assert.strictEqual(socket.pending, true);
socket.connect(this.address().port, common.mustCall(() => connected = true));

assert.strictEqual(socket.pending, true);
assert.strictEqual(socket.connecting, true);
assert.strictEqual(socket.readyState, 'opening');

// Write a string that contains a multi-byte character sequence to test that
// `bytesWritten` is incremented with the # of bytes, not # of characters.
const a = "L'État, c'est ";
const b = 'moi';

// We're still connecting at this point so the datagram is first pushed onto
// the connect queue. Make sure that it's not added to `bytesWritten` again
// when the actual write happens.
const r = socket.write(a, common.mustCall((er) => {
console.error('write cb');
ronag marked this conversation as resolved.
Show resolved Hide resolved
assert.ok(connected);
assert.strictEqual(socket.bytesWritten, Buffer.from(a + b).length);
assert.strictEqual(socket.pending, false);
}));
socket.on('close', common.mustCall(() => {
assert.strictEqual(socket.pending, true);
}));

assert.strictEqual(socket.bytesWritten, Buffer.from(a).length);
assert.strictEqual(r, false);
socket.end(b);

assert.strictEqual(socket.readyState, 'opening');
}));
1 change: 1 addition & 0 deletions test/parallel/test-net-write-arguments.js
Expand Up @@ -25,6 +25,7 @@ assert.throws(() => {
[],
{}
].forEach((value) => {
const socket = net.Stream({ highWaterMark: 0 });
// We need to check the callback since 'error' will only
// be emitted once per instance.
assert.throws(() => {
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-wrap-js-stream-exceptions.js
Expand Up @@ -10,7 +10,7 @@ process.once('uncaughtException', common.mustCall((err) => {
}));

const socket = new JSStreamWrap(new Duplex({
read: common.mustNotCall(),
read: common.mustCall(),
ronag marked this conversation as resolved.
Show resolved Hide resolved
write: common.mustCall((buffer, data, cb) => {
throw new Error('exception!');
})
Expand Down