diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index ef847c110edc23..8547957944af25 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -144,7 +144,10 @@ function ReadableState(options, stream, isDuplex) { // Has it been destroyed this.destroyed = false; - // Indicates whether the stream has errored. + // Indicates whether the stream has errored. When true no further + // _read calls, 'data' or 'readable' events should occur. This is needed + // since when autoDestroy is disabled we need a way to tell whether the + // stream has failed. this.errored = false; // Indicates whether the stream has finished destroying. @@ -258,7 +261,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { addChunk(stream, state, chunk, true); } else if (state.ended) { errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF()); - } else if (state.destroyed) { + } else if (state.destroyed || state.errored) { return false; } else { state.reading = false; @@ -453,9 +456,9 @@ Readable.prototype.read = function(n) { } // However, if we've ended, then there's no point, if we're already - // reading, then it's unnecessary, and if we're destroyed, then it's - // not allowed. - if (state.ended || state.reading || state.destroyed) { + // reading, then it's unnecessary, and if we're destroyed or errored, + // then it's not allowed. + if (state.ended || state.reading || state.destroyed || state.errored) { doRead = false; debug('reading or ended', doRead); } else if (doRead) { @@ -553,7 +556,7 @@ function emitReadable(stream) { function emitReadable_(stream) { const state = stream._readableState; debug('emitReadable_', state.destroyed, state.length, state.ended); - if (!state.destroyed && (state.length || state.ended)) { + if (!state.destroyed && !state.errored && (state.length || state.ended)) { stream.emit('readable'); state.emittedReadable = false; } diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index c3a7a35d2b3f6f..5368bbef358190 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -416,6 +416,13 @@ function onwrite(stream, er) { if (er) { state.errored = true; + + // In case of duplex streams we need to notify the readable side of the + // error. + if (stream._readableState) { + stream._readableState.errored = true; + } + if (sync) { process.nextTick(onwriteError, stream, state, er, cb); } else { diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 204e84556b0b3b..9407c9c9a1d3cf 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -1995,10 +1995,19 @@ class Http2Stream extends Duplex { let req; + // writeGeneric does not destroy on error and we cannot enable autoDestroy, + // so make sure to destroy on error. + const callback = (err) => { + if (err) { + this.destroy(err); + } + cb(err); + }; + if (writev) - req = writevGeneric(this, data, cb); + req = writevGeneric(this, data, callback); else - req = writeGeneric(this, data, encoding, cb); + req = writeGeneric(this, data, encoding, callback); trackWriteState(this, req.bytes); } diff --git a/lib/internal/stream_base_commons.js b/lib/internal/stream_base_commons.js index 2a31221f11d2d6..0278e7240c34b1 100644 --- a/lib/internal/stream_base_commons.js +++ b/lib/internal/stream_base_commons.js @@ -88,9 +88,14 @@ function onWriteComplete(status) { return; } + // TODO (ronag): This should be moved before if(stream.destroyed) + // in order to avoid swallowing error. if (status < 0) { const ex = errnoException(status, 'write', this.error); - stream.destroy(ex, this.callback); + if (typeof this.callback === 'function') + this.callback(ex); + else + stream.destroy(ex); return; } @@ -134,7 +139,7 @@ function writevGeneric(self, data, cb) { // Retain chunks if (err === 0) req._chunks = chunks; - afterWriteDispatched(self, req, err, cb); + afterWriteDispatched(req, err, cb); return req; } @@ -142,16 +147,16 @@ function writeGeneric(self, data, encoding, cb) { const req = createWriteWrap(self[kHandle]); const err = handleWriteReq(req, data, encoding); - afterWriteDispatched(self, req, err, cb); + afterWriteDispatched(req, err, cb); return req; } -function afterWriteDispatched(self, req, err, cb) { +function afterWriteDispatched(req, err, cb) { req.bytes = streamBaseState[kBytesWritten]; req.async = !!streamBaseState[kLastWriteWasAsync]; if (err !== 0) - return self.destroy(errnoException(err, 'write', req.error), cb); + return cb(errnoException(err, 'write', req.error)); if (!req.async) { cb(); @@ -264,7 +269,6 @@ function setStreamTimeout(msecs, callback) { } module.exports = { - createWriteWrap, writevGeneric, writeGeneric, onStreamRead, diff --git a/test/parallel/test-net-connect-buffer2.js b/test/parallel/test-net-connect-buffer2.js new file mode 100644 index 00000000000000..933141bc872b5a --- /dev/null +++ b/test/parallel/test-net-connect-buffer2.js @@ -0,0 +1,56 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const net = require('net'); + +const tcp = net.Server(common.mustCall((s) => { + tcp.close(); + + let buf = ''; + s.setEncoding('utf8'); + s.on('data', function(d) { + buf += d; + }); + + s.on('end', common.mustCall(function() { + console.error('SERVER: end', buf); + assert.strictEqual(buf, "L'État, c'est moi"); + s.end(); + })); +})); + +tcp.listen(0, common.mustCall(function() { + const socket = net.Stream({ highWaterMark: 0 }); + + let connected = false; + assert.strictEqual(socket.pending, true); + socket.connect(this.address().port, common.mustCall(() => connected = true)); + + assert.strictEqual(socket.pending, true); + assert.strictEqual(socket.connecting, true); + assert.strictEqual(socket.readyState, 'opening'); + + // Write a string that contains a multi-byte character sequence to test that + // `bytesWritten` is incremented with the # of bytes, not # of characters. + const a = "L'État, c'est "; + const b = 'moi'; + + // We're still connecting at this point so the datagram is first pushed onto + // the connect queue. Make sure that it's not added to `bytesWritten` again + // when the actual write happens. + const r = socket.write(a, common.mustCall((er) => { + console.error('write cb'); + assert.ok(connected); + assert.strictEqual(socket.bytesWritten, Buffer.from(a + b).length); + assert.strictEqual(socket.pending, false); + })); + socket.on('close', common.mustCall(() => { + assert.strictEqual(socket.pending, true); + })); + + assert.strictEqual(socket.bytesWritten, Buffer.from(a).length); + assert.strictEqual(r, false); + socket.end(b); + + assert.strictEqual(socket.readyState, 'opening'); +})); diff --git a/test/parallel/test-net-write-arguments.js b/test/parallel/test-net-write-arguments.js index 2b81ed7d6a3634..54958847435349 100644 --- a/test/parallel/test-net-write-arguments.js +++ b/test/parallel/test-net-write-arguments.js @@ -25,6 +25,7 @@ assert.throws(() => { [], {} ].forEach((value) => { + const socket = net.Stream({ highWaterMark: 0 }); // We need to check the callback since 'error' will only // be emitted once per instance. assert.throws(() => { diff --git a/test/parallel/test-wrap-js-stream-exceptions.js b/test/parallel/test-wrap-js-stream-exceptions.js index b90e46002ccae7..1b4d469a758d77 100644 --- a/test/parallel/test-wrap-js-stream-exceptions.js +++ b/test/parallel/test-wrap-js-stream-exceptions.js @@ -10,7 +10,7 @@ process.once('uncaughtException', common.mustCall((err) => { })); const socket = new JSStreamWrap(new Duplex({ - read: common.mustNotCall(), + read: common.mustCall(), write: common.mustCall((buffer, data, cb) => { throw new Error('exception!'); })