Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http: invoke callback with ERR_STREAM_DESTROYED if the socket is destroyed #36692

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
140 changes: 76 additions & 64 deletions lib/_http_outgoing.js
Expand Up @@ -51,10 +51,6 @@ const { Buffer } = require('buffer');
const common = require('_http_common');
const checkIsHttpToken = common._checkIsHttpToken;
const checkInvalidHeaderChar = common._checkInvalidHeaderChar;
const {
defaultTriggerAsyncIdScope,
symbols: { async_id_symbol }
} = require('internal/async_hooks');
const {
codes: {
ERR_HTTP_HEADERS_SENT,
Expand All @@ -75,6 +71,10 @@ const {
} = require('internal/errors');
const { validateString } = require('internal/validators');
const { isUint8Array } = require('internal/util/types');
const {
defaultTriggerAsyncIdScope,
symbols: { async_id_symbol }
} = require('internal/async_hooks');

const HIGH_WATER_MARK = getDefaultHighWaterMark();
const { CRLF, debug } = common;
Expand Down Expand Up @@ -312,7 +312,6 @@ OutgoingMessage.prototype.destroy = function destroy(error) {
return this;
};


// This abstract either writing directly to the socket or buffering it.
OutgoingMessage.prototype._send = function _send(data, encoding, callback) {
// This is a shameful hack to get the headers and first body chunk onto
Expand Down Expand Up @@ -341,17 +340,21 @@ OutgoingMessage.prototype._send = function _send(data, encoding, callback) {
OutgoingMessage.prototype._writeRaw = _writeRaw;
function _writeRaw(data, encoding, callback) {
const conn = this.socket;
if (conn && conn.destroyed) {
// The socket was destroyed. If we're still trying to write to it,
// then we haven't gotten the 'close' event yet.
return false;
}

if (typeof encoding === 'function') {
callback = encoding;
encoding = null;
}

if (conn?.destroyed) {
if (typeof callback === 'function') {
process.nextTick(callback, new ERR_STREAM_DESTROYED('write'));
}
// The socket was destroyed. If we're still trying to write to it,
// then we haven't gotten the 'close' event yet.
return false;
}

if (conn && conn._httpMessage === this && conn.writable) {
// There might be pending data in the this.output buffer.
if (this.outputData.length) {
Expand Down Expand Up @@ -689,23 +692,6 @@ OutgoingMessage.prototype.write = function write(chunk, encoding, callback) {
return ret;
};

function onError(msg, err, callback) {
const triggerAsyncId = msg.socket ? msg.socket[async_id_symbol] : undefined;
defaultTriggerAsyncIdScope(triggerAsyncId,
process.nextTick,
emitErrorNt,
msg,
err,
callback);
}

function emitErrorNt(msg, err, callback) {
callback(err);
if (typeof msg.emit === 'function' && !msg._closed) {
msg.emit('error', err);
}
}

function write_(msg, chunk, encoding, callback, fromEnd) {
if (typeof callback !== 'function')
callback = nop;
Expand Down Expand Up @@ -735,6 +721,7 @@ function write_(msg, chunk, encoding, callback, fromEnd) {
} else {
process.nextTick(callback, err);
}
msg.destroy(err);
return false;
}

Expand Down Expand Up @@ -804,62 +791,91 @@ OutgoingMessage.prototype.addTrailers = function addTrailers(headers) {
}
};

function onFinish(outmsg) {
if (outmsg && outmsg.socket && outmsg.socket._hadError) return;
outmsg.emit('finish');
function onFinish(err) {
if (err || this.socket?._hadError) return;
this.emit('finish');
}

function onError(msg, err, callback) {
const triggerAsyncId = msg.socket ? msg.socket[async_id_symbol] : undefined;
defaultTriggerAsyncIdScope(triggerAsyncId,
process.nextTick,
emitErrorNt,
msg,
err,
callback);
}

OutgoingMessage.prototype.end = function end(chunk, encoding, callback) {
function emitErrorNt(msg, err, callback) {
callback?.(err);
if (typeof msg.emit === 'function' && !msg._closed) {
msg.emit('error', err);
}
}

OutgoingMessage.prototype.end = function end(chunk, encoding, cb) {
if (typeof chunk === 'function') {
callback = chunk;
cb = chunk;
chunk = null;
encoding = null;
} else if (typeof encoding === 'function') {
callback = encoding;
cb = encoding;
encoding = null;
}

if (chunk) {
if (chunk !== null && chunk !== undefined) {
if (this.finished) {
onError(this,
new ERR_STREAM_WRITE_AFTER_END(),
typeof callback !== 'function' ? nop : callback);
onError(this, new ERR_STREAM_WRITE_AFTER_END(), cb);
return this;
}

if (this.socket) {
this.socket.cork();
this.socket?.cork();
}

write_(this, chunk, encoding, null, true);
} else if (this.finished) {
if (typeof callback === 'function') {
if (!this.writableFinished) {
this.on('finish', callback);
} else {
callback(new ERR_STREAM_ALREADY_FINISHED('end'));
}

let err;
if (!this.finished) {
if (!this._header) {
if (this.socket) {
this.socket.cork();
}
this._contentLength = 0;
this._implicitHeader();
}
return this;
} else if (!this._header) {
if (this.socket) {
this.socket.cork();

const finish = FunctionPrototypeBind(onFinish, this);

if (this._hasBody && this.chunkedEncoding) {
this._send('0\r\n' + this._trailer + '\r\n', 'latin1', finish);
} else {
// Force a flush, HACK.
this._send('', 'latin1', finish);
}

this._contentLength = 0;
this._implicitHeader();
this.finished = true; // aka. WritableState.ended
} else if (this.writableFinished) {
err = new ERR_STREAM_ALREADY_FINISHED('end');
} else if (this.destroyed) {
err = new ERR_STREAM_DESTROYED('end');
}

if (typeof callback === 'function')
this.once('finish', callback);

const finish = FunctionPrototypeBind(onFinish, undefined, this);
if (typeof cb === 'function') {
if (err || this.writableFinished) {
process.nextTick(cb, err);
} else {
// TODO (fix): What if error? See kOnFinished in writable.js.
this.once('finish', cb);
}
}

if (this._hasBody && this.chunkedEncoding) {
this._send('0\r\n' + this._trailer + '\r\n', 'latin1', finish);
} else {
// Force a flush, HACK.
this._send('', 'latin1', finish);
if (err) {
if (this.socket) {
this.socket.uncork();
}
return this;
}

if (this.socket) {
Expand All @@ -869,14 +885,10 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) {
}
this[kCorked] = 0;

this.finished = true;

// There is the first message on the outgoing queue, and we've sent
// everything to the socket.
debug('outgoing message end.');
if (this.outputData.length === 0 &&
this.socket &&
this.socket._httpMessage === this) {
if (this.outputData.length === 0 && this.socket?._httpMessage === this) {
this._finish();
}

Expand Down
61 changes: 61 additions & 0 deletions test/parallel/test-http-outgoing-socket-destroyed.js
@@ -0,0 +1,61 @@
'use strict';

const common = require('../common');
const { createServer, request } = require('http');

{
const server = createServer((req, res) => {
server.close();

req.socket.destroy();

res.write('hello', common.expectsError({
code: 'ERR_STREAM_DESTROYED'
}));
});

server.listen(0, common.mustCall(() => {
const req = request({
host: 'localhost',
port: server.address().port
});

req.on('response', common.mustNotCall());
req.on('error', common.expectsError({
code: 'ECONNRESET'
}));

req.end();
}));
}

{
const server = createServer((req, res) => {
res.write('hello');
req.resume();

const onError = common.expectsError({
code: 'ERR_STREAM_DESTROYED'
});

res.on('close', () => {
res.write('world', common.mustCall((err) => {
onError(err);
server.close();
}));
});
});

server.listen(0, common.mustCall(() => {
const req = request({
host: 'localhost',
port: server.address().port
});

req.on('response', common.mustCall((res) => {
res.socket.destroy();
}));

req.end();
}));
}