Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http: add maxTotalSockets to agent class #33617

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions doc/api/http.md
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,16 @@ added: v0.3.6
By default set to `Infinity`. Determines how many concurrent sockets the agent
can have open per origin. Origin is the returned value of [`agent.getName()`][].

### `agent.maxTotalSockets`
<!-- YAML
added: REPLACEME
-->

* {number}

By default set to `Infinity`. Determines how many concurrent sockets the agent
can have open. Unlike `maxSockets`, this parameter applies across all origins.

### `agent.requests`
<!-- YAML
added: v0.5.9
Expand Down
52 changes: 48 additions & 4 deletions lib/_http_agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
'use strict';

const {
NumberIsNaN,
ObjectKeys,
ObjectSetPrototypeOf,
ObjectValues,
Expand All @@ -36,11 +37,14 @@ const {
codes: {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_OPT_VALUE,
ERR_OUT_OF_RANGE,
},
} = require('internal/errors');
const { once } = require('internal/util');
const { validateNumber } = require('internal/validators');

const kOnKeylog = Symbol('onkeylog');
const kRequestOptions = Symbol('requestOptions');
// New Agent code.

// The largest departure from the previous implementation is that
Expand Down Expand Up @@ -88,11 +92,22 @@ function Agent(options) {
this.maxSockets = this.options.maxSockets || Agent.defaultMaxSockets;
this.maxFreeSockets = this.options.maxFreeSockets || 256;
this.scheduling = this.options.scheduling || 'fifo';
this.maxTotalSockets = this.options.maxTotalSockets;
this.totalSocketCount = 0;

if (this.scheduling !== 'fifo' && this.scheduling !== 'lifo') {
throw new ERR_INVALID_OPT_VALUE('scheduling', this.scheduling);
}

if (this.maxTotalSockets !== undefined) {
validateNumber(this.maxTotalSockets, 'maxTotalSockets');
if (this.maxTotalSockets <= 0 || NumberIsNaN(this.maxTotalSockets))
throw new ERR_OUT_OF_RANGE('maxTotalSockets', '> 0',
this.maxTotalSockets);
} else {
this.maxTotalSockets = Infinity;
}

this.on('free', (socket, options) => {
const name = this.getName(options);
debug('agent.on(free)', name);
Expand Down Expand Up @@ -131,7 +146,8 @@ function Agent(options) {
if (this.sockets[name])
count += this.sockets[name].length;

if (count > this.maxSockets ||
if (this.totalSocketCount > this.maxTotalSockets ||
count > this.maxSockets ||
freeLen >= this.maxFreeSockets ||
!this.keepSocketAlive(socket)) {
socket.destroy();
Expand Down Expand Up @@ -246,7 +262,9 @@ Agent.prototype.addRequest = function addRequest(req, options, port/* legacy */,
this.reuseSocket(socket, req);
setRequestSocket(this, req, socket);
this.sockets[name].push(socket);
} else if (sockLen < this.maxSockets) {
this.totalSocketCount++;
} else if (sockLen < this.maxSockets &&
this.totalSocketCount < this.maxTotalSockets) {
debug('call onSocket', sockLen, freeLen);
// If we are under maxSockets create a new one.
this.createSocket(req, options, (err, socket) => {
Expand All @@ -261,6 +279,10 @@ Agent.prototype.addRequest = function addRequest(req, options, port/* legacy */,
if (!this.requests[name]) {
this.requests[name] = [];
}

// Used to create sockets for pending requests from different origin
req[kRequestOptions] = options;

this.requests[name].push(req);
}
};
Expand All @@ -286,7 +308,8 @@ Agent.prototype.createSocket = function createSocket(req, options, cb) {
this.sockets[name] = [];
}
this.sockets[name].push(s);
debug('sockets', name, this.sockets[name].length);
this.totalSocketCount++;
debug('sockets', name, this.sockets[name].length, this.totalSocketCount);
installListeners(this, s, options);
cb(null, s);
});
Expand Down Expand Up @@ -392,13 +415,33 @@ Agent.prototype.removeSocket = function removeSocket(s, options) {
// Don't leak
if (sockets[name].length === 0)
delete sockets[name];
this.totalSocketCount--;
}
}
}

let req;
if (this.requests[name] && this.requests[name].length) {
debug('removeSocket, have a request, make a socket');
const req = this.requests[name][0];
req = this.requests[name][0];
} else {
rickyes marked this conversation as resolved.
Show resolved Hide resolved
// TODO(rickyes): this logic will not be FIFO across origins.
// There might be older requests in a different origin, but
// if the origin which releases the socket has pending requests
// that will be prioritized.
for (const prop in this.requests) {
rickyes marked this conversation as resolved.
Show resolved Hide resolved
// Check whether this specific origin is already at maxSockets
if (this.sockets[prop] && this.sockets[prop].length) break;
debug('removeSocket, have a request with different origin,' +
' make a socket');
req = this.requests[prop][0];
rickyes marked this conversation as resolved.
Show resolved Hide resolved
options = req[kRequestOptions];
rickyes marked this conversation as resolved.
Show resolved Hide resolved
break;
}
}
rickyes marked this conversation as resolved.
Show resolved Hide resolved

if (req && options) {
req[kRequestOptions] = undefined;
// If we have pending requests and a socket gets closed make a new one
this.createSocket(req, options, (err, socket) => {
if (err)
Expand All @@ -407,6 +450,7 @@ Agent.prototype.removeSocket = function removeSocket(s, options) {
socket.emit('free');
});
}

};

Agent.prototype.keepSocketAlive = function keepSocketAlive(socket) {
Expand Down
113 changes: 113 additions & 0 deletions test/parallel/test-http-agent-maxtotalsockets.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const http = require('http');
const Countdown = require('../common/countdown');

assert.throws(() => new http.Agent({
maxTotalSockets: 'test',
}), {
code: 'ERR_INVALID_ARG_TYPE',
name: 'TypeError',
message: 'The "maxTotalSockets" argument must be of type number. ' +
"Received type string ('test')",
});

[-1, 0, NaN].forEach((item) => {
assert.throws(() => new http.Agent({
maxTotalSockets: item,
}), {
code: 'ERR_OUT_OF_RANGE',
name: 'RangeError',
message: 'The value of "maxTotalSockets" is out of range. ' +
`It must be > 0. Received ${item}`,
});
});

assert.ok(new http.Agent({
maxTotalSockets: Infinity,
}));

function start(param = {}) {
const { maxTotalSockets, maxSockets } = param;

const agent = new http.Agent({
keepAlive: true,
keepAliveMsecs: 1000,
maxTotalSockets,
maxSockets,
maxFreeSockets: 3
});

const server = http.createServer(common.mustCall((req, res) => {
res.end('hello world');
}, 6));
const server2 = http.createServer(common.mustCall((req, res) => {
res.end('hello world');
}, 6));

server.keepAliveTimeout = 0;
server2.keepAliveTimeout = 0;

const countdown = new Countdown(12, () => {
assert.strictEqual(getRequestCount(), 0);
agent.destroy();
server.close();
server2.close();
});

function handler(s) {
for (let i = 0; i < 6; i++) {
http.get({
host: 'localhost',
port: s.address().port,
agent,
path: `/${i}`,
}, common.mustCall((res) => {
assert.strictEqual(res.statusCode, 200);
res.resume();
res.on('end', common.mustCall(() => {
for (const key of Object.keys(agent.sockets)) {
assert(agent.sockets[key].length <= maxSockets);
}
assert(getTotalSocketsCount() <= maxTotalSockets);
countdown.dec();
}));
}));
}
}

function getTotalSocketsCount() {
let num = 0;
for (const key of Object.keys(agent.sockets)) {
num += agent.sockets[key].length;
}
return num;
}

function getRequestCount() {
let num = 0;
for (const key of Object.keys(agent.requests)) {
num += agent.requests[key].length;
}
return num;
}

server.listen(0, common.mustCall(() => handler(server)));
server2.listen(0, common.mustCall(() => handler(server2)));
}

// If maxTotalSockets is larger than maxSockets,
// then the origin check will be skipped
// when the socket is removed.
[{
maxTotalSockets: 2,
maxSockets: 3,
}, {
maxTotalSockets: 3,
maxSockets: 2,
}, {
maxTotalSockets: 2,
maxSockets: 2,
}].forEach(start);