Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix two missing async hooks destroy calls #23272

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmark/http/bench-parser.js
Expand Up @@ -25,7 +25,7 @@ function main({ len, n }) {
bench.start();
for (var i = 0; i < n; i++) {
parser.execute(header, 0, header.length);
parser.reinitialize(REQUEST);
parser.reinitialize(REQUEST, i > 0);
}
bench.end(n);
}
Expand Down
2 changes: 1 addition & 1 deletion benchmark/misc/freelist.js
Expand Up @@ -9,7 +9,7 @@ const bench = common.createBenchmark(main, {
});

function main({ n }) {
const FreeList = require('internal/freelist');
const { FreeList } = require('internal/freelist');
const poolSize = 1000;
const list = new FreeList('test', poolSize, Object);
var j;
Expand Down
2 changes: 1 addition & 1 deletion lib/_http_agent.js
Expand Up @@ -167,7 +167,7 @@ Agent.prototype.addRequest = function addRequest(req, options, port/* legacy */,
var socket = this.freeSockets[name].shift();
// Guard against an uninitialized or user supplied Socket.
if (socket._handle && typeof socket._handle.asyncReset === 'function') {
// Assign the handle a new asyncId and run any init() hooks.
// Assign the handle a new asyncId and run any destroy()/init() hooks.
socket._handle.asyncReset();
socket[async_id_symbol] = socket._handle.getAsyncId();
}
Expand Down
3 changes: 2 additions & 1 deletion lib/_http_client.js
Expand Up @@ -47,6 +47,7 @@ const {
ERR_UNESCAPED_CHARACTERS
} = require('internal/errors').codes;
const { validateTimerDuration } = require('internal/timers');
const is_reused_symbol = require('internal/freelist').symbols.is_reused_symbol;

const INVALID_PATH_REGEX = /[^\u0021-\u00ff]/;

Expand Down Expand Up @@ -631,7 +632,7 @@ function tickOnSocket(req, socket) {
var parser = parsers.alloc();
req.socket = socket;
req.connection = socket;
parser.reinitialize(HTTPParser.RESPONSE);
parser.reinitialize(HTTPParser.RESPONSE, parser[is_reused_symbol]);
parser.socket = socket;
parser.outgoing = req;
req.parser = parser;
Expand Down
2 changes: 1 addition & 1 deletion lib/_http_common.js
Expand Up @@ -23,7 +23,7 @@

const { methods, HTTPParser } = internalBinding('http_parser');

const FreeList = require('internal/freelist');
const { FreeList } = require('internal/freelist');
const { ondrain } = require('internal/http');
const incoming = require('_http_incoming');
const {
Expand Down
3 changes: 2 additions & 1 deletion lib/_http_server.js
Expand Up @@ -42,6 +42,7 @@ const {
defaultTriggerAsyncIdScope,
getOrSetAsyncId
} = require('internal/async_hooks');
const is_reused_symbol = require('internal/freelist').symbols.is_reused_symbol;
const { IncomingMessage } = require('_http_incoming');
const {
ERR_HTTP_HEADERS_SENT,
Expand Down Expand Up @@ -338,7 +339,7 @@ function connectionListenerInternal(server, socket) {
socket.on('timeout', socketOnTimeout);

var parser = parsers.alloc();
parser.reinitialize(HTTPParser.REQUEST);
parser.reinitialize(HTTPParser.REQUEST, parser[is_reused_symbol]);
parser.socket = socket;
socket.parser = parser;

Expand Down
21 changes: 17 additions & 4 deletions lib/internal/freelist.js
@@ -1,5 +1,7 @@
'use strict';

const is_reused_symbol = Symbol('isReused');

class FreeList {
constructor(name, max, ctor) {
this.name = name;
Expand All @@ -9,9 +11,15 @@ class FreeList {
}

alloc() {
return this.list.length ?
this.list.pop() :
this.ctor.apply(this, arguments);
let item;
if (this.list.length > 0) {
item = this.list.pop();
item[is_reused_symbol] = true;
} else {
item = this.ctor.apply(this, arguments);
item[is_reused_symbol] = false;
}
return item;
}

free(obj) {
Expand All @@ -23,4 +31,9 @@ class FreeList {
}
}

module.exports = FreeList;
module.exports = {
FreeList,
symbols: {
is_reused_symbol
}
mcollina marked this conversation as resolved.
Show resolved Hide resolved
};
9 changes: 9 additions & 0 deletions src/async_wrap.cc
Expand Up @@ -563,6 +563,7 @@ AsyncWrap::AsyncWrap(Environment* env,
CHECK_NE(provider, PROVIDER_NONE);
CHECK_GE(object->InternalFieldCount(), 1);

async_id_ = -1;
// Use AsyncReset() call to execute the init() callbacks.
AsyncReset(execution_async_id, silent);
}
Expand Down Expand Up @@ -606,6 +607,14 @@ void AsyncWrap::EmitDestroy(Environment* env, double async_id) {
// and reused over their lifetime. This way a new uid can be assigned when
// the resource is pulled out of the pool and put back into use.
void AsyncWrap::AsyncReset(double execution_async_id, bool silent) {
if (async_id_ != -1) {
// This instance was in use before, we have already emitted an init with
// its previous async_id and need to emit a matching destroy for that
// before generating a new async_id.
EmitDestroy(env(), async_id_);
}

// Now we can assign a new async_id_ to this instance.
async_id_ =
execution_async_id == -1 ? env()->new_async_id() : execution_async_id;
trigger_async_id_ = env()->get_default_trigger_async_id();
Expand Down
10 changes: 8 additions & 2 deletions src/node_http_parser.cc
Expand Up @@ -465,6 +465,8 @@ class Parser : public AsyncWrap, public StreamListener {
Environment* env = Environment::GetCurrent(args);

CHECK(args[0]->IsInt32());
CHECK(args[1]->IsBoolean());
bool isReused = args[1]->IsTrue();
http_parser_type type =
static_cast<http_parser_type>(args[0].As<Int32>()->Value());

Expand All @@ -473,8 +475,12 @@ class Parser : public AsyncWrap, public StreamListener {
ASSIGN_OR_RETURN_UNWRAP(&parser, args.Holder());
// Should always be called from the same context.
CHECK_EQ(env, parser->env());
// The parser is being reused. Reset the async id and call init() callbacks.
parser->AsyncReset();
// This parser has either just been created or it is being reused.
// We must only call AsyncReset for the latter case, because AsyncReset has
// already been called via the constructor for the former case.
if (isReused) {
parser->AsyncReset();
}
parser->Init(type);
}

Expand Down
10 changes: 2 additions & 8 deletions test/async-hooks/test-graph.http.js
Expand Up @@ -38,20 +38,14 @@ process.on('exit', function() {
{ type: 'HTTPPARSER',
id: 'httpparser:1',
triggerAsyncId: 'tcpserver:1' },
{ type: 'HTTPPARSER',
id: 'httpparser:2',
triggerAsyncId: 'tcpserver:1' },
{ type: 'TCPWRAP', id: 'tcp:2', triggerAsyncId: 'tcpserver:1' },
{ type: 'Timeout', id: 'timeout:1', triggerAsyncId: 'tcp:2' },
{ type: 'HTTPPARSER',
id: 'httpparser:3',
triggerAsyncId: 'tcp:2' },
{ type: 'HTTPPARSER',
id: 'httpparser:4',
id: 'httpparser:2',
triggerAsyncId: 'tcp:2' },
{ type: 'Timeout',
id: 'timeout:2',
triggerAsyncId: 'httpparser:4' },
triggerAsyncId: 'httpparser:2' },
{ type: 'SHUTDOWNWRAP',
id: 'shutdown:1',
triggerAsyncId: 'tcp:2' } ]
Expand Down
84 changes: 84 additions & 0 deletions test/parallel/test-async-hooks-http-agent-destroy.js
@@ -0,0 +1,84 @@
'use strict';
// Flags: --expose-internals
const common = require('../common');
const assert = require('assert');
const { async_id_symbol } = require('internal/async_hooks').symbols;
const async_hooks = require('async_hooks');
const http = require('http');

// Regression test for https://github.com/nodejs/node/issues/19859
// Checks that an http.Agent emits a destroy for the old asyncId before calling
// asyncReset()s when reusing a socket handle. The setup is nearly identical to
// parallel/test-async-hooks-http-agent (which focuses on the assertion that
// a fresh asyncId is assigned to the net.Socket instance).

const destroyedIds = new Set();
async_hooks.createHook({
destroy: common.mustCallAtLeast((asyncId) => {
destroyedIds.add(asyncId);
}, 1)
}).enable();

// Make sure a single socket is transparently reused for 2 requests.
const agent = new http.Agent({
keepAlive: true,
keepAliveMsecs: Infinity,
maxSockets: 1
});

const server = http.createServer(common.mustCall((req, res) => {
req.once('data', common.mustCallAtLeast(() => {
res.writeHead(200, { 'Content-Type': 'text/plain' });
res.write('foo');
}));
req.on('end', common.mustCall(() => {
res.end('bar');
}));
}, 2)).listen(0, common.mustCall(() => {
const port = server.address().port;
const payload = 'hello world';

// First request. This is useless except for adding a socket to the
// agent’s pool for reuse.
const r1 = http.request({
agent, port, method: 'POST'
}, common.mustCall((res) => {
// Remember which socket we used.
const socket = res.socket;
const asyncIdAtFirstRequest = socket[async_id_symbol];
assert.ok(asyncIdAtFirstRequest > 0, `${asyncIdAtFirstRequest} > 0`);
// Check that request and response share their socket.
assert.strictEqual(r1.socket, socket);

res.on('data', common.mustCallAtLeast(() => {}));
res.on('end', common.mustCall(() => {
// setImmediate() to give the agent time to register the freed socket.
setImmediate(common.mustCall(() => {
// The socket is free for reuse now.
assert.strictEqual(socket[async_id_symbol], -1);

// second request:
const r2 = http.request({
agent, port, method: 'POST'
}, common.mustCall((res) => {
assert.ok(destroyedIds.has(asyncIdAtFirstRequest));

// Empty payload, to hit the “right” code path.
r2.end('');

res.on('data', common.mustCallAtLeast(() => {}));
res.on('end', common.mustCall(() => {
// Clean up to let the event loop stop.
server.close();
agent.destroy();
}));
}));

// Schedule a payload to be written immediately, but do not end the
// request just yet.
r2.write(payload);
}));
}));
}));
r1.end(payload);
}));
61 changes: 61 additions & 0 deletions test/parallel/test-async-hooks-http-parser-destroy.js
@@ -0,0 +1,61 @@
'use strict';
const common = require('../common');
const Countdown = require('../common/countdown');
const assert = require('assert');
const async_hooks = require('async_hooks');
const http = require('http');

// Regression test for https://github.com/nodejs/node/issues/19859.
// Checks that matching destroys are emitted when creating new/reusing old http
// parser instances.

const N = 50;
const KEEP_ALIVE = 100;

const createdIds = [];
const destroyedIds = [];
async_hooks.createHook({
init: common.mustCallAtLeast((asyncId, type) => {
if (type === 'HTTPPARSER') {
createdIds.push(asyncId);
}
}, N),
destroy: (asyncId) => {
destroyedIds.push(asyncId);
}
}).enable();

const server = http.createServer(function(req, res) {
res.end('Hello');
});

const keepAliveAgent = new http.Agent({
keepAlive: true,
keepAliveMsecs: KEEP_ALIVE,
});

const countdown = new Countdown(N, () => {
server.close(() => {
// give the server sockets time to close (which will also free their
// associated parser objects) after the server has been closed.
setTimeout(() => {
createdIds.forEach((createdAsyncId) => {
assert.ok(destroyedIds.indexOf(createdAsyncId) >= 0);
});
}, KEEP_ALIVE * 2);
});
});

server.listen(0, function() {
for (let i = 0; i < N; ++i) {
(function makeRequest() {
http.get({
port: server.address().port,
agent: keepAliveAgent
}, function(res) {
countdown.dec();
res.resume();
});
})();
}
});
25 changes: 12 additions & 13 deletions test/parallel/test-freelist.js
Expand Up @@ -4,28 +4,27 @@

require('../common');
const assert = require('assert');
const FreeList = require('internal/freelist');
const { FreeList } = require('internal/freelist');

assert.strictEqual(typeof FreeList, 'function');

const flist1 = new FreeList('flist1', 3, String);
const flist1 = new FreeList('flist1', 3, Object);

// Allocating when empty, should not change the list size
const result = flist1.alloc('test');
assert.strictEqual(typeof result, 'string');
assert.strictEqual(result, 'test');
const result = flist1.alloc();
assert.strictEqual(typeof result, 'object');
assert.strictEqual(flist1.list.length, 0);

// Exhaust the free list
assert(flist1.free('test1'));
assert(flist1.free('test2'));
assert(flist1.free('test3'));
assert(flist1.free({ id: 'test1' }));
assert(flist1.free({ id: 'test2' }));
assert(flist1.free({ id: 'test3' }));

// Now it should not return 'true', as max length is exceeded
assert.strictEqual(flist1.free('test4'), false);
assert.strictEqual(flist1.free('test5'), false);
assert.strictEqual(flist1.free({ id: 'test4' }), false);
assert.strictEqual(flist1.free({ id: 'test5' }), false);

// At this point 'alloc' should just return the stored values
assert.strictEqual(flist1.alloc(), 'test3');
assert.strictEqual(flist1.alloc(), 'test2');
assert.strictEqual(flist1.alloc(), 'test1');
assert.strictEqual(flist1.alloc().id, 'test3');
assert.strictEqual(flist1.alloc().id, 'test2');
assert.strictEqual(flist1.alloc().id, 'test1');
4 changes: 2 additions & 2 deletions test/parallel/test-http-parser.js
Expand Up @@ -98,7 +98,7 @@ function expectBody(expected) {
throw new Error('hello world');
};

parser.reinitialize(HTTPParser.REQUEST);
parser.reinitialize(HTTPParser.REQUEST, false);

assert.throws(
() => { parser.execute(request, 0, request.length); },
Expand Down Expand Up @@ -558,7 +558,7 @@ function expectBody(expected) {
parser[kOnBody] = expectBody('ping');
parser.execute(req1, 0, req1.length);

parser.reinitialize(REQUEST);
parser.reinitialize(REQUEST, false);
parser[kOnBody] = expectBody('pong');
parser[kOnHeadersComplete] = onHeadersComplete2;
parser.execute(req2, 0, req2.length);
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-internal-modules-expose.js
Expand Up @@ -7,5 +7,5 @@ const config = process.binding('config');

console.log(config, process.argv);

assert.strictEqual(typeof require('internal/freelist'), 'function');
assert.strictEqual(typeof require('internal/freelist').FreeList, 'function');
assert.strictEqual(config.exposeInternals, true);