From be3c7aceba58f0a58ee4813a9894681d9c8ba56e Mon Sep 17 00:00:00 2001 From: Denys Otrishko Date: Sun, 8 Dec 2019 13:23:19 +0200 Subject: [PATCH] http2: wait for session socket writable end on close/destroy This slightly alters the behaviour of session close by first using .end() on a session socket to finish writing the data and only then calls .destroy() to make sure the Readable side is closed. This allows the socket to finish transmitting data, receive proper FIN packet and avoid ECONNRESET errors upon graceful close. onStreamClose now directly calls stream.destroy() instead of kMaybeDestroy because the latter will first check that the stream has writableFinished set. And that may not be true as we have just (synchronously) called .end() on the stream if it was not closed and that doesn't give it enough time to finish. Furthermore there is no point in waiting for 'finish' as the other party have already closed the stream and we won't be able to write anyway. This also changes a few tests to correctly handle graceful session close. This includes: * not reading request data (on client side) * not reading push stream data (on client side) * relying on socket.destroy() (on client) to finish server session due to the destroy of the socket without closing the server session. As the goaway itself is *not* a session close. Added few 'close' event mustCall checks. PR-URL: https://github.com/nodejs/node/pull/30854 Reviewed-By: Matteo Collina Reviewed-By: Anna Henningsen Reviewed-By: James M Snell Reviewed-By: Rich Trott --- lib/internal/http2/core.js | 107 ++++++++++++------ src/node_http2.cc | 2 +- test/parallel/test-http2-capture-rejection.js | 2 +- test/parallel/test-http2-client-destroy.js | 1 + ...p2-client-stream-destroy-before-connect.js | 9 +- .../test-http2-compat-client-upload-reject.js | 2 + .../test-http2-create-client-connect.js | 4 +- test/parallel/test-http2-goaway-opaquedata.js | 8 +- .../test-http2-ping-settings-heapdump.js | 9 +- .../parallel/test-http2-server-push-stream.js | 2 + 10 files changed, 101 insertions(+), 45 deletions(-) diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 3376dc97d2e129..7e3f217617cef0 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -488,7 +488,10 @@ function onStreamClose(code) { if (!stream || stream.destroyed) return false; - debugStreamObj(stream, 'closed with code %d', code); + debugStreamObj( + stream, 'closed with code %d, closed %s, readable %s', + code, stream.closed, stream.readable + ); if (!stream.closed) closeStream(stream, code, kNoRstStream); @@ -497,7 +500,7 @@ function onStreamClose(code) { // Defer destroy we actually emit end. if (!stream.readable || code !== NGHTTP2_NO_ERROR) { // If errored or ended, we can destroy immediately. - stream[kMaybeDestroy](code); + stream.destroy(); } else { // Wait for end to destroy. stream.on('end', stream[kMaybeDestroy]); @@ -985,22 +988,76 @@ function emitClose(self, error) { self.emit('close'); } -function finishSessionDestroy(session, error) { - debugSessionObj(session, 'finishSessionDestroy'); - +function cleanupSession(session) { const socket = session[kSocket]; - if (!socket.destroyed) - socket.destroy(error); - + const handle = session[kHandle]; session[kProxySocket] = undefined; session[kSocket] = undefined; session[kHandle] = undefined; session[kNativeFields] = new Uint8Array(kSessionUint8FieldCount); - socket[kSession] = undefined; - socket[kServer] = undefined; + if (handle) + handle.ondone = null; + if (socket) { + socket[kSession] = undefined; + socket[kServer] = undefined; + } +} + +function finishSessionClose(session, error) { + debugSessionObj(session, 'finishSessionClose'); + + const socket = session[kSocket]; + cleanupSession(session); + + if (socket && !socket.destroyed) { + // Always wait for writable side to finish. + socket.end((err) => { + debugSessionObj(session, 'finishSessionClose socket end', err); + // Due to the way the underlying stream is handled in Http2Session we + // won't get graceful Readable end from the other side even if it was sent + // as the stream is already considered closed and will neither be read + // from nor keep the event loop alive. + // Therefore destroy the socket immediately. + // Fixing this would require some heavy juggling of ReadStart/ReadStop + // mostly on Windows as on Unix it will be fine with just ReadStart + // after this 'ondone' callback. + socket.destroy(error); + emitClose(session, error); + }); + } else { + process.nextTick(emitClose, session, error); + } +} + +function closeSession(session, code, error) { + debugSessionObj(session, 'start closing/destroying'); + + const state = session[kState]; + state.flags |= SESSION_FLAGS_DESTROYED; + state.destroyCode = code; + + // Clear timeout and remove timeout listeners. + session.setTimeout(0); + session.removeAllListeners('timeout'); + + // Destroy any pending and open streams + if (state.pendingStreams.size > 0 || state.streams.size > 0) { + const cancel = new ERR_HTTP2_STREAM_CANCEL(error); + state.pendingStreams.forEach((stream) => stream.destroy(cancel)); + state.streams.forEach((stream) => stream.destroy(error)); + } - // Finally, emit the close and error events (if necessary) on next tick. - process.nextTick(emitClose, session, error); + // Disassociate from the socket and server. + const socket = session[kSocket]; + const handle = session[kHandle]; + + // Destroy the handle if it exists at this point. + if (handle !== undefined) { + handle.ondone = finishSessionClose.bind(null, session, error); + handle.destroy(code, socket.destroyed); + } else { + finishSessionClose(session, error); + } } // Upon creation, the Http2Session takes ownership of the socket. The session @@ -1327,6 +1384,7 @@ class Http2Session extends EventEmitter { destroy(error = NGHTTP2_NO_ERROR, code) { if (this.destroyed) return; + debugSessionObj(this, 'destroying'); if (typeof error === 'number') { @@ -1338,30 +1396,7 @@ class Http2Session extends EventEmitter { if (code === undefined && error != null) code = NGHTTP2_INTERNAL_ERROR; - const state = this[kState]; - state.flags |= SESSION_FLAGS_DESTROYED; - state.destroyCode = code; - - // Clear timeout and remove timeout listeners - this.setTimeout(0); - this.removeAllListeners('timeout'); - - // Destroy any pending and open streams - const cancel = new ERR_HTTP2_STREAM_CANCEL(error); - state.pendingStreams.forEach((stream) => stream.destroy(cancel)); - state.streams.forEach((stream) => stream.destroy(error)); - - // Disassociate from the socket and server - const socket = this[kSocket]; - const handle = this[kHandle]; - - // Destroy the handle if it exists at this point - if (handle !== undefined) { - handle.ondone = finishSessionDestroy.bind(null, this, error); - handle.destroy(code, socket.destroyed); - } else { - finishSessionDestroy(this, error); - } + closeSession(this, code, error); } // Closing the session will: diff --git a/src/node_http2.cc b/src/node_http2.cc index b8d93171329467..1110288f6f9a9d 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -1799,7 +1799,7 @@ void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) { Context::Scope context_scope(env()->context()); Http2Scope h2scope(this); CHECK_NOT_NULL(stream_); - Debug(this, "receiving %d bytes", nread); + Debug(this, "receiving %d bytes, offset %d", nread, stream_buf_offset_); AllocatedBuffer buf(env(), buf_); // Only pass data on if nread > 0 diff --git a/test/parallel/test-http2-capture-rejection.js b/test/parallel/test-http2-capture-rejection.js index 58f43581eb6bd3..4469c6b7e64d20 100644 --- a/test/parallel/test-http2-capture-rejection.js +++ b/test/parallel/test-http2-capture-rejection.js @@ -72,7 +72,6 @@ events.captureRejections = true; })); } - { // Test error thrown in 'request' event @@ -136,6 +135,7 @@ events.captureRejections = true; const session = connect(`http://localhost:${port}`); const req = session.request(); + req.resume(); session.on('stream', common.mustCall(async (stream) => { session.close(); diff --git a/test/parallel/test-http2-client-destroy.js b/test/parallel/test-http2-client-destroy.js index fe2c9591c7017d..c7a87b9c06a113 100644 --- a/test/parallel/test-http2-client-destroy.js +++ b/test/parallel/test-http2-client-destroy.js @@ -145,6 +145,7 @@ const Countdown = require('../common/countdown'); server.on('stream', common.mustNotCall()); server.listen(0, common.mustCall(() => { const client = h2.connect(`http://localhost:${server.address().port}`); + client.on('close', common.mustCall()); const socket = client[kSocket]; socket.on('close', common.mustCall(() => { assert(socket.destroyed); diff --git a/test/parallel/test-http2-client-stream-destroy-before-connect.js b/test/parallel/test-http2-client-stream-destroy-before-connect.js index d834de5d11ebe7..3083b5a9d1dfa5 100644 --- a/test/parallel/test-http2-client-stream-destroy-before-connect.js +++ b/test/parallel/test-http2-client-stream-destroy-before-connect.js @@ -6,6 +6,7 @@ if (!common.hasCrypto) const assert = require('assert'); const h2 = require('http2'); const NGHTTP2_INTERNAL_ERROR = h2.constants.NGHTTP2_INTERNAL_ERROR; +const Countdown = require('../common/countdown'); const server = h2.createServer(); @@ -27,6 +28,11 @@ server.on('stream', (stream) => { server.listen(0, common.mustCall(() => { const client = h2.connect(`http://localhost:${server.address().port}`); + const countdown = new Countdown(2, () => { + server.close(); + client.close(); + }); + client.on('connect', () => countdown.dec()); const req = client.request(); req.destroy(new Error('test')); @@ -39,8 +45,7 @@ server.listen(0, common.mustCall(() => { req.on('close', common.mustCall(() => { assert.strictEqual(req.rstCode, NGHTTP2_INTERNAL_ERROR); assert.strictEqual(req.rstCode, NGHTTP2_INTERNAL_ERROR); - server.close(); - client.close(); + countdown.dec(); })); req.on('response', common.mustNotCall()); diff --git a/test/parallel/test-http2-compat-client-upload-reject.js b/test/parallel/test-http2-compat-client-upload-reject.js index e6a187cb12b264..6e3fee2e7c2ce3 100644 --- a/test/parallel/test-http2-compat-client-upload-reject.js +++ b/test/parallel/test-http2-compat-client-upload-reject.js @@ -23,9 +23,11 @@ fs.readFile(loc, common.mustCall((err, data) => { res.end(); }); })); + server.on('close', common.mustCall()); server.listen(0, common.mustCall(() => { const client = http2.connect(`http://localhost:${server.address().port}`); + client.on('close', common.mustCall()); const req = client.request({ ':method': 'POST' }); req.on('response', common.mustCall((headers) => { diff --git a/test/parallel/test-http2-create-client-connect.js b/test/parallel/test-http2-create-client-connect.js index 02c6c70642acb0..8a4fc9a1d0e075 100644 --- a/test/parallel/test-http2-create-client-connect.js +++ b/test/parallel/test-http2-create-client-connect.js @@ -38,11 +38,13 @@ const URL = url.URL; const client = h2.connect.apply(null, i) .on('connect', common.mustCall(() => maybeClose(client))); + client.on('close', common.mustCall()); }); // Will fail because protocol does not match the server. - h2.connect({ port: port, protocol: 'https:' }) + const client = h2.connect({ port: port, protocol: 'https:' }) .on('error', common.mustCall(() => serverClose.dec())); + client.on('close', common.mustCall()); })); } diff --git a/test/parallel/test-http2-goaway-opaquedata.js b/test/parallel/test-http2-goaway-opaquedata.js index 3f1fb4d7954414..56c0ae168c0c8b 100644 --- a/test/parallel/test-http2-goaway-opaquedata.js +++ b/test/parallel/test-http2-goaway-opaquedata.js @@ -8,20 +8,24 @@ const http2 = require('http2'); const server = http2.createServer(); const data = Buffer.from([0x1, 0x2, 0x3, 0x4, 0x5]); +let session; server.on('stream', common.mustCall((stream) => { - stream.session.goaway(0, 0, data); + session = stream.session; + session.on('close', common.mustCall()); + session.goaway(0, 0, data); stream.respond(); stream.end(); })); +server.on('close', common.mustCall()); server.listen(0, () => { - const client = http2.connect(`http://localhost:${server.address().port}`); client.once('goaway', common.mustCall((code, lastStreamID, buf) => { assert.deepStrictEqual(code, 0); assert.deepStrictEqual(lastStreamID, 1); assert.deepStrictEqual(data, buf); + session.close(); server.close(); })); const req = client.request(); diff --git a/test/parallel/test-http2-ping-settings-heapdump.js b/test/parallel/test-http2-ping-settings-heapdump.js index 78b3c8cd74f506..7d27310700c7a8 100644 --- a/test/parallel/test-http2-ping-settings-heapdump.js +++ b/test/parallel/test-http2-ping-settings-heapdump.js @@ -30,7 +30,12 @@ for (const variant of ['ping', 'settings']) { })); server.listen(0, common.mustCall(() => { - http2.connect(`http://localhost:${server.address().port}`, - common.mustCall()); + const client = http2.connect(`http://localhost:${server.address().port}`, + common.mustCall()); + client.on('error', (err) => { + // We destroy the session so it's possible to get ECONNRESET here. + if (err.code !== 'ECONNRESET') + throw err; + }); })); } diff --git a/test/parallel/test-http2-server-push-stream.js b/test/parallel/test-http2-server-push-stream.js index 74d41ba4b9c672..450d68f08a95b2 100644 --- a/test/parallel/test-http2-server-push-stream.js +++ b/test/parallel/test-http2-server-push-stream.js @@ -55,6 +55,8 @@ server.listen(0, common.mustCall(() => { assert.strictEqual(headers['x-push-data'], 'pushed by server'); })); stream.on('aborted', common.mustNotCall()); + // We have to read the data of the push stream to end gracefully. + stream.resume(); })); let data = '';