From 0a629f5f8ee01cf986c0d12d6c1671727061e6c7 Mon Sep 17 00:00:00 2001 From: rogertyang Date: Mon, 18 Jul 2022 19:48:01 +0800 Subject: [PATCH 1/5] http: reuse socket only when it is drained Ensuring every request is assigned to a drained socket or nothing. Because is has no benifit for a request to be attached to a non drained socket and it prevents the request from being assigned to a drained one, which might occur soon or already in the free pool We achieve this by claiming a socket as free only when the socket is drained. --- lib/_http_client.js | 13 ++- lib/_http_outgoing.js | 12 +-- lib/_http_server.js | 1 - ...st-http-agent-reuse-drained-socket-only.js | 79 +++++++++++++++++++ 4 files changed, 94 insertions(+), 11 deletions(-) create mode 100644 test/parallel/test-http-agent-reuse-drained-socket-only.js diff --git a/lib/_http_client.js b/lib/_http_client.js index 66977351870a80..b411e2ea0d4ec3 100644 --- a/lib/_http_client.js +++ b/lib/_http_client.js @@ -351,7 +351,6 @@ ObjectSetPrototypeOf(ClientRequest.prototype, OutgoingMessage.prototype); ObjectSetPrototypeOf(ClientRequest, OutgoingMessage); ClientRequest.prototype._finish = function _finish() { - FunctionPrototypeCall(OutgoingMessage.prototype._finish, this); if (hasObserver('http')) { startPerf(this, kClientRequestStatistics, { type: 'http', @@ -658,7 +657,7 @@ function parserOnIncomingClient(res, shouldKeepAlive) { // Add our listener first, so that we guarantee socket cleanup res.on('end', responseOnEnd); - req.on('prefinish', requestOnPrefinish); + req.on('finish', requestOnFinish); socket.on('timeout', responseOnTimeout); // If the user did not listen for the 'response' event, then they @@ -730,12 +729,16 @@ function responseOnEnd() { socket.end(); } assert(!socket.writable); - } else if (req.finished && !this.aborted) { + } else if (req.writableFinished && !this.aborted) { + assert(req.finished); // We can assume `req.finished` means all data has been written since: // - `'responseOnEnd'` means we have been assigned a socket. // - when we have a socket we write directly to it without buffering. // - `req.finished` means `end()` has been called and no further data. // can be written + // In addition, `req.writableFinished` means all data written has been + // accepted by the kernel. (i.e. the `req.socket` is drained).Without + // this constraint, we may assign a non drained socket to a request. responseKeepAlive(req); } } @@ -748,7 +751,9 @@ function responseOnTimeout() { res.emit('timeout'); } -function requestOnPrefinish() { +// This function is necessary in the case where we receive the entire reponse +// from server before we finish sending out the request +function requestOnFinish() { const req = this; if (req.shouldKeepAlive && req._ended) diff --git a/lib/_http_outgoing.js b/lib/_http_outgoing.js index 006ac437a14938..075782c6022986 100644 --- a/lib/_http_outgoing.js +++ b/lib/_http_outgoing.js @@ -40,7 +40,6 @@ const { } = primordials; const { getDefaultHighWaterMark } = require('internal/streams/state'); -const assert = require('internal/assert'); const EE = require('events'); const Stream = require('stream'); const internalUtil = require('internal/util'); @@ -985,10 +984,11 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) { }; -OutgoingMessage.prototype._finish = function _finish() { - assert(this.socket); - this.emit('prefinish'); -}; +// Subclasses HttpClientRequest and HttpServerResponse should +// implement this function. This function is called once all user data +// are flushed to the socket. Note that it has a chance that the socket +// is not drained. +OutgoingMessage.prototype._finish = function() {}; // This logic is probably a bit confusing. Let me explain a bit: @@ -1008,7 +1008,7 @@ OutgoingMessage.prototype._finish = function _finish() { // the socket yet. Thus the outgoing messages need to be prepared to queue // up data internally before sending it on further to the socket's queue. // -// This function, outgoingFlush(), is called by both the Server and Client +// This function, _flush(), is called by both the Server and Client // to attempt to flush any pending messages out to the socket. OutgoingMessage.prototype._flush = function _flush() { const socket = this.socket; diff --git a/lib/_http_server.js b/lib/_http_server.js index 6e4147a3ca2050..cfe2b700f8ebe9 100644 --- a/lib/_http_server.js +++ b/lib/_http_server.js @@ -222,7 +222,6 @@ ServerResponse.prototype._finish = function _finish() { }, }); } - OutgoingMessage.prototype._finish.call(this); }; diff --git a/test/parallel/test-http-agent-reuse-drained-socket-only.js b/test/parallel/test-http-agent-reuse-drained-socket-only.js new file mode 100644 index 00000000000000..ac29d872e8f240 --- /dev/null +++ b/test/parallel/test-http-agent-reuse-drained-socket-only.js @@ -0,0 +1,79 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const http = require('http'); +const net = require('net'); + +const agent = new http.Agent({ + keepAlive: true, + maxFreeSockets: 1024 +}); + +const server = net.createServer({ + pauseOnConnect: true, +}, (sock) => { + // Do not read anything from `sock` + sock.pause(); + sock.write('HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: Keep-Alive\r\n\r\n'); +}); + +server.listen(0, common.mustCall(() => { + sendFstReq(server.address().port); +})); + +function sendFstReq(serverPort) { + const req = http.request({ + agent, + host: '127.0.0.1', + port: serverPort, + }, (res) => { + res.on('data', noop); + res.on('end', common.mustCall(() => { + // Agent's socket reusing code is registered to process.nextTick(), + // to ensure it take effect, fire in the next event loop + setTimeout(sendSecReq, 10, serverPort, req.socket.localPort); + })); + }); + + // Overwhelm the flow control window + // note that tcp over the loopback inteface has a large flow control window + assert.strictEqual(req.write('a'.repeat(6_000_000)), false); + + req.end(); +} + +function sendSecReq(serverPort, fstReqCliPort) { + // Make the second request, which should be sent on a new socket + // because the first socket is not drained and hence can not be reused + const req = http.request({ + agent, + host: '127.0.0.1', + port: serverPort, + }, (res) => { + res.on('data', noop); + res.on('end', common.mustCall(() => { + setTimeout(sendThrReq, 10, serverPort, req.socket.localPort); + })); + }); + + req.on('socket', common.mustCall((sock) => { + assert.notStrictEqual(sock.localPort, fstReqCliPort); + })); + req.end(); +} + +function sendThrReq(serverPort, secReqCliPort) { + // Make the third request, the agent should reuse the second socket we just made + const req = http.request({ + agent, + host: '127.0.0.1', + port: serverPort, + }, noop); + + req.on('socket', common.mustCall((sock) => { + assert.strictEqual(sock.localPort, secReqCliPort); + process.exit(0); + })); +} + +function noop() { } From 216383588dd1740471fde4c962cfdff75fdad981 Mon Sep 17 00:00:00 2001 From: rogertyang Date: Wed, 20 Jul 2022 21:00:39 +0800 Subject: [PATCH 2/5] http: reuse socket only when it is drained Preserve 'prefinish' event --- lib/_http_client.js | 1 + lib/_http_outgoing.js | 12 +++++++----- lib/_http_server.js | 1 + 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/lib/_http_client.js b/lib/_http_client.js index b411e2ea0d4ec3..06811722c8af42 100644 --- a/lib/_http_client.js +++ b/lib/_http_client.js @@ -351,6 +351,7 @@ ObjectSetPrototypeOf(ClientRequest.prototype, OutgoingMessage.prototype); ObjectSetPrototypeOf(ClientRequest, OutgoingMessage); ClientRequest.prototype._finish = function _finish() { + FunctionPrototypeCall(OutgoingMessage.prototype._finish, this); if (hasObserver('http')) { startPerf(this, kClientRequestStatistics, { type: 'http', diff --git a/lib/_http_outgoing.js b/lib/_http_outgoing.js index 075782c6022986..dcdea29968590a 100644 --- a/lib/_http_outgoing.js +++ b/lib/_http_outgoing.js @@ -40,6 +40,7 @@ const { } = primordials; const { getDefaultHighWaterMark } = require('internal/streams/state'); +const assert = require('internal/assert'); const EE = require('events'); const Stream = require('stream'); const internalUtil = require('internal/util'); @@ -984,11 +985,12 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) { }; -// Subclasses HttpClientRequest and HttpServerResponse should -// implement this function. This function is called once all user data -// are flushed to the socket. Note that it has a chance that the socket -// is not drained. -OutgoingMessage.prototype._finish = function() {}; +// This function is called once all user data are flushed to the socket. +// Note that it has a chance that the socket is not drained. +OutgoingMessage.prototype._finish = function _finish() { + assert(this.socket); + this.emit('prefinish'); +}; // This logic is probably a bit confusing. Let me explain a bit: diff --git a/lib/_http_server.js b/lib/_http_server.js index cfe2b700f8ebe9..6e4147a3ca2050 100644 --- a/lib/_http_server.js +++ b/lib/_http_server.js @@ -222,6 +222,7 @@ ServerResponse.prototype._finish = function _finish() { }, }); } + OutgoingMessage.prototype._finish.call(this); }; From 9048e30d938e558e0acaf355479b64801a5e1506 Mon Sep 17 00:00:00 2001 From: rogertyang Date: Sat, 23 Jul 2022 10:32:25 +0800 Subject: [PATCH 3/5] http: reuse socket only when it is drained Fix test, make sure old socket is not drained when we makeing new requests --- .../test-http-agent-reuse-drained-socket-only.js | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/test/parallel/test-http-agent-reuse-drained-socket-only.js b/test/parallel/test-http-agent-reuse-drained-socket-only.js index ac29d872e8f240..4975ce002bd065 100644 --- a/test/parallel/test-http-agent-reuse-drained-socket-only.js +++ b/test/parallel/test-http-agent-reuse-drained-socket-only.js @@ -6,7 +6,9 @@ const net = require('net'); const agent = new http.Agent({ keepAlive: true, - maxFreeSockets: 1024 + maxFreeSockets: Infinity, + maxSockets: Infinity, + maxTotalSockets: Infinity, }); const server = net.createServer({ @@ -30,14 +32,14 @@ function sendFstReq(serverPort) { res.on('data', noop); res.on('end', common.mustCall(() => { // Agent's socket reusing code is registered to process.nextTick(), - // to ensure it take effect, fire in the next event loop - setTimeout(sendSecReq, 10, serverPort, req.socket.localPort); + // and will be run after this function, make sure it take effect. + setImmediate(sendSecReq, serverPort, req.socket.localPort); })); }); - // Overwhelm the flow control window - // note that tcp over the loopback inteface has a large flow control window - assert.strictEqual(req.write('a'.repeat(6_000_000)), false); + // Overwhelm the flow control window, accroding to TCP standard, + // flow control window is up to 1GB in theory. + assert.strictEqual(req.write(Buffer.alloc(1 + 1024 * 1024 * 1024, 0)), false); req.end(); } @@ -52,7 +54,7 @@ function sendSecReq(serverPort, fstReqCliPort) { }, (res) => { res.on('data', noop); res.on('end', common.mustCall(() => { - setTimeout(sendThrReq, 10, serverPort, req.socket.localPort); + setImmediate(sendThrReq, serverPort, req.socket.localPort); })); }); From 8224fe1881195170442aef7b01da0c6d03d57c8e Mon Sep 17 00:00:00 2001 From: rogertyang Date: Sat, 23 Jul 2022 14:54:14 +0800 Subject: [PATCH 4/5] http: reuse socket only when it is drained Fix test, make sure old socket is not drained when we make new requests. --- .../test-http-agent-reuse-drained-socket-only.js | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/test/parallel/test-http-agent-reuse-drained-socket-only.js b/test/parallel/test-http-agent-reuse-drained-socket-only.js index 4975ce002bd065..d85c16e4443bef 100644 --- a/test/parallel/test-http-agent-reuse-drained-socket-only.js +++ b/test/parallel/test-http-agent-reuse-drained-socket-only.js @@ -31,17 +31,20 @@ function sendFstReq(serverPort) { }, (res) => { res.on('data', noop); res.on('end', common.mustCall(() => { + assert.ok(req.writableEnded); // Agent's socket reusing code is registered to process.nextTick(), // and will be run after this function, make sure it take effect. setImmediate(sendSecReq, serverPort, req.socket.localPort); })); }); - // Overwhelm the flow control window, accroding to TCP standard, - // flow control window is up to 1GB in theory. - assert.strictEqual(req.write(Buffer.alloc(1 + 1024 * 1024 * 1024, 0)), false); - - req.end(); + req.on('socket', common.mustCall(() => { + // If `req` is assigned to a socket, `req.write()` write data to the + // socket directly and the return value indicate whether the socket + // is non drained. + assert.strictEqual(req.write(Buffer.alloc(1024 * 1024 * 1024, 0)), false); + req.end(); + })); } function sendSecReq(serverPort, fstReqCliPort) { From 4f8be4aa555e27465a3c9970c7e559ec1b72012a Mon Sep 17 00:00:00 2001 From: rogertyang Date: Mon, 25 Jul 2022 19:06:02 +0800 Subject: [PATCH 5/5] http: reuse socket only when it is drained Fix test in Windows, make sure old socket is not drained when we make new requests. --- ...st-http-agent-reuse-drained-socket-only.js | 54 ++++++++++++++++--- 1 file changed, 46 insertions(+), 8 deletions(-) diff --git a/test/parallel/test-http-agent-reuse-drained-socket-only.js b/test/parallel/test-http-agent-reuse-drained-socket-only.js index d85c16e4443bef..2bd53f40edaaf3 100644 --- a/test/parallel/test-http-agent-reuse-drained-socket-only.js +++ b/test/parallel/test-http-agent-reuse-drained-socket-only.js @@ -31,20 +31,58 @@ function sendFstReq(serverPort) { }, (res) => { res.on('data', noop); res.on('end', common.mustCall(() => { - assert.ok(req.writableEnded); // Agent's socket reusing code is registered to process.nextTick(), // and will be run after this function, make sure it take effect. setImmediate(sendSecReq, serverPort, req.socket.localPort); })); }); - req.on('socket', common.mustCall(() => { - // If `req` is assigned to a socket, `req.write()` write data to the - // socket directly and the return value indicate whether the socket - // is non drained. - assert.strictEqual(req.write(Buffer.alloc(1024 * 1024 * 1024, 0)), false); - req.end(); - })); + // Make the `req.socket` non drained, i.e. has some data queued to write to + // and accept by the kernel. In Linux and Mac, we only need to call `req.end(aLargeBuffer)`. + // However, in Windows, the mechanism of acceptance is loose, the following code is a workaround + // for Windows. + + /** + * https://docs.microsoft.com/en-US/troubleshoot/windows/win32/data-segment-tcp-winsock says + * + * Winsock uses the following rules to indicate a send completion to the application + * (depending on how the send is invoked, the completion notification could be the + * function returning from a blocking call, signaling an event, or calling a notification + * function, and so forth): + * - If the socket is still within SO_SNDBUF quota, Winsock copies the data from the application + * send and indicates the send completion to the application. + * - If the socket is beyond SO_SNDBUF quota and there's only one previously buffered send still + * in the stack kernel buffer, Winsock copies the data from the application send and indicates + * the send completion to the application. + * - If the socket is beyond SO_SNDBUF quota and there's more than one previously buffered send + * in the stack kernel buffer, Winsock copies the data from the application send. Winsock doesn't + * indicate the send completion to the application until the stack completes enough sends to put + * back the socket within SO_SNDBUF quota or only one outstanding send condition. + */ + + req.on('socket', () => { + req.socket.on('connect', () => { + // Print tcp send buffer information + console.log(process.report.getReport().libuv.filter((handle) => handle.type === 'tcp')); + + const dataLargerThanTCPSendBuf = Buffer.alloc(1024 * 1024 * 64, 0); + + req.write(dataLargerThanTCPSendBuf); + req.uncork(); + if (process.platform === 'win32') { + assert.ok(req.socket.writableLength === 0); + } + + req.write(dataLargerThanTCPSendBuf); + req.uncork(); + if (process.platform === 'win32') { + assert.ok(req.socket.writableLength === 0); + } + + req.end(dataLargerThanTCPSendBuf); + assert.ok(req.socket.writableLength > 0); + }); + }); } function sendSecReq(serverPort, fstReqCliPort) {