Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[x] http: client destroy stream #29192

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 29 additions & 0 deletions doc/api/deprecations.md
Expand Up @@ -2548,13 +2548,42 @@ APIs that do not make sense to use in userland. File streams should always be
opened through their corresponding factory methods [`fs.createWriteStream()`][]
and [`fs.createReadStream()`][]) or by passing a file descriptor in options.

<a id="DEP0XXX"></a>
### DEP0XXX: ClientRequest.abort() is the same ClientRequest.destroy()
<!-- YAML
changes:
ronag marked this conversation as resolved.
Show resolved Hide resolved
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/29192
description: Documentation-only deprecation.
-->

Type: Documentation-only

[`ClientRequest.destroy()`][] is the same as [`ClientRequest.abort()`][].

jasnell marked this conversation as resolved.
Show resolved Hide resolved
<a id="DEP0XXX"></a>
### DEP0XXX: ClientRequest.aborted is the same ClientRequest.destroyed
<!-- YAML
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/29192
description: Documentation-only deprecation.
-->

Type: Documentation-only

[`ClientRequest.destroyed`][] is the same as [`ClientRequest.aborted`][].


[`--pending-deprecation`]: cli.html#cli_pending_deprecation
[`--throw-deprecation`]: cli.html#cli_throw_deprecation
[`Buffer.allocUnsafeSlow(size)`]: buffer.html#buffer_class_method_buffer_allocunsafeslow_size
[`Buffer.from(array)`]: buffer.html#buffer_class_method_buffer_from_array
[`Buffer.from(buffer)`]: buffer.html#buffer_class_method_buffer_from_buffer
[`Buffer.isBuffer()`]: buffer.html#buffer_class_method_buffer_isbuffer_obj
[`Cipher`]: crypto.html#crypto_class_cipher
[`ClientRequest.abort()`]: http.html#http_request_abort
[`ClientRequest.destroy()`]: stream.html#stream_readable_destroy_error
[`Decipher`]: crypto.html#crypto_class_decipher
[`EventEmitter.listenerCount(emitter, eventName)`]: events.html#events_eventemitter_listenercount_emitter_eventname
[`REPLServer.clearBufferedCommand()`]: repl.html#repl_replserver_clearbufferedcommand
Expand Down
45 changes: 17 additions & 28 deletions doc/api/http.md
Expand Up @@ -565,10 +565,14 @@ srv.listen(1337, '127.0.0.1', () => {
### request.abort()
<!-- YAML
added: v0.3.8
deprecated: REPLACEME
-->

> Stability: 0 - Deprecated. Use [`request.destroy()`][] instead.

Marks the request as aborting. Calling this will cause remaining data
in the response to be dropped and the socket to be destroyed.
in the response to be dropped and the socket to be destroyed. After
calling this method, no further errors will be emitted.

### request.aborted
<!-- YAML
Expand All @@ -579,6 +583,8 @@ changes:
description: The `aborted` property is no longer a timestamp number.
-->

> Stability: 0 - Deprecated. Use [`request.destroyed`][] instead.

* {boolean}

The `request.aborted` property will be `true` if the request has
Expand Down Expand Up @@ -2319,43 +2325,24 @@ In the case of a connection error, the following events will be emitted:
* `'error'`
* `'close'`

In the case of a premature connection close before the response is received,
the following events will be emitted in the following order:

* `'socket'`
* `'error'` with an error with message `'Error: socket hang up'` and code
`'ECONNRESET'`
* `'close'`

In the case of a premature connection close after the response is received,
the following events will be emitted in the following order:

* `'socket'`
* `'response'`
* `'data'` any number of times, on the `res` object
* (connection closed here)
* `'aborted'` on the `res` object
* `'close'`
* `'close'` on the `res` object

If `req.abort()` is called before the connection succeeds, the following events
will be emitted in the following order:
If `req.destroy()` is called before the connection succeeds, the following
events will be emitted in the following order:

* `'socket'`
* (`req.abort()` called here)
* (`req.destroy(err)` called here)
* `'abort'`
* `'error'` with an error with message `'Error: socket hang up'` and code
`'ECONNRESET'`
* `'error'` if `err` was provided in `req.destroy(err)`.
* `'close'`

If `req.abort()` is called after the response is received, the following events
will be emitted in the following order:
If `req.destroy()` is called after the response is received, the following
events will be emitted in the following order:

* `'socket'`
* `'response'`
* `'data'` any number of times, on the `res` object
* (`req.abort()` called here)
* (`req.destroy(err)` called here)
* `'abort'`
* `'error'` if `err` was provided in `req.destroy(err)`.
* `'aborted'` on the `res` object
* `'close'`
* `'close'` on the `res` object
Expand Down Expand Up @@ -2392,6 +2379,8 @@ not abort the request or do anything besides add a `'timeout'` event.
[`net.createConnection()`]: net.html#net_net_createconnection_options_connectlistener
[`new URL()`]: url.html#url_constructor_new_url_input_base
[`removeHeader(name)`]: #http_request_removeheader_name
[`request.destroy()`]: stream.html#stream_readable_destroy_error
[`request.destroyed`]: stream.html#stream_readable_destroyed
[`request.end()`]: #http_request_end_data_encoding_callback
[`request.flushHeaders()`]: #http_request_flushheaders
[`request.getHeader()`]: #http_request_getheader_name
Expand Down
118 changes: 70 additions & 48 deletions lib/_http_client.js
Expand Up @@ -23,6 +23,7 @@

const { Object } = primordials;

const { destroy, kWritableState } = require('internal/streams/destroy');
const net = require('net');
const url = require('url');
const assert = require('internal/assert');
Expand Down Expand Up @@ -190,14 +191,24 @@ function ClientRequest(input, options, cb) {

this._ended = false;
this.res = null;
// The aborted property is for backwards compat and is not
// strictly the same as destroyed as it can be written
// to by the user.
this.aborted = false;
this.timeoutCb = null;
this.upgradeOrConnect = false;
this.parser = null;
this.maxHeadersCount = null;
this.reusedSocket = false;

let called = false;
// Used by destroyImpl.
this[kWritableState] = {
errorEmitted: false,
destroyed: false,
emitClose: true,
socketPending: true,
destroyCallback: null
};
ronag marked this conversation as resolved.
Show resolved Hide resolved

if (this.agent) {
// If there is an agent we should default to Connection:keep-alive,
Expand Down Expand Up @@ -261,11 +272,17 @@ function ClientRequest(input, options, cb) {
}

const oncreate = (err, socket) => {
if (called)
const state = this[kWritableState];
if (!state.socketPending)
return;
called = true;
state.socketPending = false;
if (err) {
process.nextTick(() => this.emit('error', err));
const { destroyCallback: cb } = state;
if (cb) {
cb(err);
} else {
this.destroy(err);
}
mcollina marked this conversation as resolved.
Show resolved Hide resolved
return;
}
this.onSocket(socket);
Expand All @@ -280,9 +297,10 @@ function ClientRequest(input, options, cb) {
this._last = true;
this.shouldKeepAlive = false;
if (typeof options.createConnection === 'function') {
const state = this[kWritableState];
const newSocket = options.createConnection(options, oncreate);
if (newSocket && !called) {
called = true;
if (newSocket && state.socketPending) {
state.socketPending = false;
this.onSocket(newSocket);
} else {
return;
Expand All @@ -298,6 +316,12 @@ function ClientRequest(input, options, cb) {
Object.setPrototypeOf(ClientRequest.prototype, OutgoingMessage.prototype);
Object.setPrototypeOf(ClientRequest, OutgoingMessage);

Object.defineProperty(ClientRequest.prototype, 'destroyed', {
get() {
return this[kWritableState].destroyed;
}
});

ClientRequest.prototype._finish = function _finish() {
DTRACE_HTTP_CLIENT_REQUEST(this, this.socket);
OutgoingMessage.prototype._finish.call(this);
Expand All @@ -311,28 +335,37 @@ ClientRequest.prototype._implicitHeader = function _implicitHeader() {
this[kOutHeaders]);
};

ClientRequest.prototype.abort = function abort() {
if (!this.aborted) {
process.nextTick(emitAbortNT.bind(this));
}
ClientRequest.prototype.destroy = destroy;
ClientRequest.prototype._destroy = function(err, cb) {
this.aborted = true;
process.nextTick(emitAbortNT, this);

// If we're aborting, we don't care about any more response data.
if (this.res) {
this.res._dump();
}

// In the event that we don't have a socket, we will pop out of
// the request queue through handling in onSocket.
if (this.socket) {
if (this.upgradeOrConnect) {
// We're detached from socket.
cb(err);
} else if (this.socket) {
// in-progress
this.socket.destroy();
this.socket.destroy(err, cb);
} else if (this[kWritableState].socketPending) {
// In the event that we don't have a socket, we will pop out of
// the request queue through handling in onSocket.
this[kWritableState].destroyCallback = (er) => cb(er || err);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to wait for the 'socket' event to be emitted instead of adding one more state parameter: https://nodejs.org/api/http.html#http_event_socket.

Copy link
Member Author

@ronag ronag Sep 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That won't work, the socket event is not always emitted. See the error case in oncreate, i.e. close to where your other comment is.

The error from oncreate needs to be properly forwarded to the _destroy callback.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a user perspective, I think we should be swallowing that error if destroy() was called before hand.

Do we test that calling destroy() before we have a socket actually calls socket.destroy()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be further refactored if/once #29656 is merged. But it basically does the same thing.

Copy link
Member Author

@ronag ronag Sep 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be swallowing that error if destroy() was called before hand.

This confuses me. Isn't it a bit inconsistent with the discussion in #29197? @lpinca might have input?

Do we test that calling destroy() before we have a socket actually calls socket.destroy()?

Not sure, I will check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we test that calling destroy() before we have a socket actually calls socket.destroy()?

Test added

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The possibility of deadlocks/having a callback not called in the destroy process makes me nervous. The current logic is significantly simpler, even if less correct, and I feel this might bite us in the future.

Copy link
Member Author

@ronag ronag Sep 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The possibility of deadlocks/having a callback not called in the destroy process makes me nervous.

That's the way Node works? This is not the only case? What can we do to make you less nervous? More tests?

The current logic is significantly simpler, even if less correct, and I feel this might bite us in the future.

I strongly disagree with this. In my opinion this kind of mentality will stagnate the project. Shouldn't we strive after correctness and consistency?

We have similar scenarios in a lot of different places and I have pending PR's (e.g. #29656) with efforts on this.

If this (swallow error) is the way we go then I'd like to ask #29197 to be reconsidered (to "no error after destroy") in order to achieve some form of consistency. Otherwise, we will never have "correct" code even in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we strive after correctness and consistency?

No. We should strive to not break current users, and then consistency. Correctness it's extremely hard to measure because there are no specifications for any of this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should strive to not break current users

How does this break current users?

} else {
cb(err);
}
};

ClientRequest.prototype.abort = function abort() {
this.destroy();
};

function emitAbortNT() {
this.emit('abort');
function emitAbortNT(self) {
self.emit('abort');
}

function ondrain() {
Expand Down Expand Up @@ -363,24 +396,23 @@ function socketCloseListener() {
res.aborted = true;
res.emit('aborted');
}
req.emit('close');
if (!res.aborted && res.readable) {
res.on('end', function() {
// We can only destroy req after 'end'. Otherwise we will dump the
// data.
req.destroy();
this.emit('close');
});
res.push(null);
} else {
req.destroy();
res.emit('close');
}
} else {
if (!req.socket._hadError) {
// This socket error fired before we started to
// receive a response. The error needs to
// fire on the request.
req.socket._hadError = true;
req.emit('error', connResetException('socket hang up'));
}
req.emit('close');
// This socket error fired before we started to
// receive a response. The error needs to
// fire on the request.
req.destroy(connResetException('socket hang up'));
}

// Too bad. That output wasn't getting written.
Expand All @@ -400,13 +432,6 @@ function socketErrorListener(err) {
const req = socket._httpMessage;
debug('SOCKET ERROR:', err.message, err.stack);

if (req) {
// For Safety. Some additional errors might fire later on
// and we need to make sure we don't double-fire the error event.
req.socket._hadError = true;
req.emit('error', err);
}

const parser = socket.parser;
if (parser) {
parser.finish();
Expand All @@ -416,7 +441,7 @@ function socketErrorListener(err) {
// Ensure that no further data will come out of the socket
socket.removeListener('data', socketOnData);
socket.removeListener('end', socketOnEnd);
socket.destroy();
req.destroy(err);
}

function freeSocketErrorListener(err) {
Expand All @@ -431,17 +456,15 @@ function socketOnEnd() {
const req = this._httpMessage;
const parser = this.parser;

if (!req.res && !req.socket._hadError) {
// If we don't have a response then we know that the socket
// ended prematurely and we need to emit an error on the request.
req.socket._hadError = true;
req.emit('error', connResetException('socket hang up'));
}
if (parser) {
parser.finish();
freeParser(parser, req, socket);
}
socket.destroy();

// If we don't have a response then we know that the socket
// ended prematurely and we need to emit an error on the request.
const err = !req.res ? connResetException('socket hang up') : null;
mcollina marked this conversation as resolved.
Show resolved Hide resolved
req.destroy(err);
}

function socketOnData(d) {
Expand All @@ -456,9 +479,7 @@ function socketOnData(d) {
prepareError(ret, parser, d);
debug('parse error', ret);
freeParser(parser, req, socket);
socket.destroy();
req.socket._hadError = true;
req.emit('error', ret);
req.destroy(ret);
} else if (parser.incoming && parser.incoming.upgrade) {
// Upgrade (if status code 101) or CONNECT
const bytesParsed = ret;
Expand Down Expand Up @@ -490,10 +511,10 @@ function socketOnData(d) {
socket.readableFlowing = null;

req.emit(eventName, res, socket, bodyHead);
req.emit('close');
req.destroy();
} else {
// Requested Upgrade or used CONNECT method, but have no handler.
socket.destroy();
req.destroy();
}
} else if (parser.incoming && parser.incoming.complete &&
// When the status code is informational (100, 102-199),
Expand Down Expand Up @@ -582,7 +603,7 @@ function parserOnIncomingClient(res, shouldKeepAlive) {
// If the user did not listen for the 'response' event, then they
// can't possibly read the data, so we ._dump() it into the void
// so that the socket doesn't hang there in a paused state.
if (req.aborted || !req.emit('response', res))
if (req.destroyed || !req.emit('response', res))
res._dump();

if (method === 'HEAD')
Expand Down Expand Up @@ -720,10 +741,11 @@ ClientRequest.prototype.onSocket = function onSocket(socket) {
};

function onSocketNT(req, socket) {
if (req.aborted) {
if (req.destroyed) {
const { destroyCallback: cb } = req[kWritableState];
// If we were aborted while waiting for a socket, skip the whole thing.
if (!req.agent) {
socket.destroy();
socket.destroy(null, cb);
mcollina marked this conversation as resolved.
Show resolved Hide resolved
} else {
req.emit('close');
socket.emit('free');
Expand Down