diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index a7f963f221b326..acdf8e8e03cd4b 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -501,7 +501,10 @@ function onStreamClose(code) { if (!stream || stream.destroyed) return false; - debugStreamObj(stream, 'closed with code %d', code); + debugStreamObj( + stream, 'closed with code %d, closed %s, readable %s', + code, stream.closed, stream.readable + ); if (!stream.closed) closeStream(stream, code, kNoRstStream); @@ -510,7 +513,7 @@ function onStreamClose(code) { // Defer destroy we actually emit end. if (!stream.readable || code !== NGHTTP2_NO_ERROR) { // If errored or ended, we can destroy immediately. - stream[kMaybeDestroy](code); + stream.destroy(); } else { // Wait for end to destroy. stream.on('end', stream[kMaybeDestroy]); @@ -1021,22 +1024,76 @@ function emitClose(self, error) { self.emit('close'); } -function finishSessionDestroy(session, error) { - debugSessionObj(session, 'finishSessionDestroy'); - +function cleanupSession(session) { const socket = session[kSocket]; - if (!socket.destroyed) - socket.destroy(error); - + const handle = session[kHandle]; session[kProxySocket] = undefined; session[kSocket] = undefined; session[kHandle] = undefined; session[kNativeFields] = new Uint8Array(kSessionUint8FieldCount); - socket[kSession] = undefined; - socket[kServer] = undefined; + if (handle) + handle.ondone = null; + if (socket) { + socket[kSession] = undefined; + socket[kServer] = undefined; + } +} + +function finishSessionClose(session, error) { + debugSessionObj(session, 'finishSessionClose'); + + const socket = session[kSocket]; + cleanupSession(session); + + if (socket && !socket.destroyed) { + // Always wait for writable side to finish. + socket.end((err) => { + debugSessionObj(session, 'finishSessionClose socket end', err); + // Due to the way the underlying stream is handled in Http2Session we + // won't get graceful Readable end from the other side even if it was sent + // as the stream is already considered closed and will neither be read + // from nor keep the event loop alive. + // Therefore destroy the socket immediately. + // Fixing this would require some heavy juggling of ReadStart/ReadStop + // mostly on Windows as on Unix it will be fine with just ReadStart + // after this 'ondone' callback. + socket.destroy(error); + emitClose(session, error); + }); + } else { + process.nextTick(emitClose, session, error); + } +} + +function closeSession(session, code, error) { + debugSessionObj(session, 'start closing/destroying'); + + const state = session[kState]; + state.flags |= SESSION_FLAGS_DESTROYED; + state.destroyCode = code; + + // Clear timeout and remove timeout listeners. + session.setTimeout(0); + session.removeAllListeners('timeout'); + + // Destroy any pending and open streams + if (state.pendingStreams.size > 0 || state.streams.size > 0) { + const cancel = new ERR_HTTP2_STREAM_CANCEL(error); + state.pendingStreams.forEach((stream) => stream.destroy(cancel)); + state.streams.forEach((stream) => stream.destroy(error)); + } - // Finally, emit the close and error events (if necessary) on next tick. - process.nextTick(emitClose, session, error); + // Disassociate from the socket and server. + const socket = session[kSocket]; + const handle = session[kHandle]; + + // Destroy the handle if it exists at this point. + if (handle !== undefined) { + handle.ondone = finishSessionClose.bind(null, session, error); + handle.destroy(code, socket.destroyed); + } else { + finishSessionClose(session, error); + } } // Upon creation, the Http2Session takes ownership of the socket. The session @@ -1363,6 +1420,7 @@ class Http2Session extends EventEmitter { destroy(error = NGHTTP2_NO_ERROR, code) { if (this.destroyed) return; + debugSessionObj(this, 'destroying'); if (typeof error === 'number') { @@ -1374,30 +1432,7 @@ class Http2Session extends EventEmitter { if (code === undefined && error != null) code = NGHTTP2_INTERNAL_ERROR; - const state = this[kState]; - state.flags |= SESSION_FLAGS_DESTROYED; - state.destroyCode = code; - - // Clear timeout and remove timeout listeners - this.setTimeout(0); - this.removeAllListeners('timeout'); - - // Destroy any pending and open streams - const cancel = new ERR_HTTP2_STREAM_CANCEL(error); - state.pendingStreams.forEach((stream) => stream.destroy(cancel)); - state.streams.forEach((stream) => stream.destroy(error)); - - // Disassociate from the socket and server - const socket = this[kSocket]; - const handle = this[kHandle]; - - // Destroy the handle if it exists at this point - if (handle !== undefined) { - handle.ondone = finishSessionDestroy.bind(null, this, error); - handle.destroy(code, socket.destroyed); - } else { - finishSessionDestroy(this, error); - } + closeSession(this, code, error); } // Closing the session will: diff --git a/src/node_http2.cc b/src/node_http2.cc index d317d1f32ccaba..76ef72005a3087 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -1827,7 +1827,7 @@ void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) { Context::Scope context_scope(env()->context()); Http2Scope h2scope(this); CHECK_NOT_NULL(stream_); - Debug(this, "receiving %d bytes", nread); + Debug(this, "receiving %d bytes, offset %d", nread, stream_buf_offset_); AllocatedBuffer buf(env(), buf_); // Only pass data on if nread > 0 diff --git a/test/parallel/test-http2-capture-rejection.js b/test/parallel/test-http2-capture-rejection.js index 58f43581eb6bd3..4469c6b7e64d20 100644 --- a/test/parallel/test-http2-capture-rejection.js +++ b/test/parallel/test-http2-capture-rejection.js @@ -72,7 +72,6 @@ events.captureRejections = true; })); } - { // Test error thrown in 'request' event @@ -136,6 +135,7 @@ events.captureRejections = true; const session = connect(`http://localhost:${port}`); const req = session.request(); + req.resume(); session.on('stream', common.mustCall(async (stream) => { session.close(); diff --git a/test/parallel/test-http2-client-destroy.js b/test/parallel/test-http2-client-destroy.js index 35ee20723c15b7..88850b9db51ccc 100644 --- a/test/parallel/test-http2-client-destroy.js +++ b/test/parallel/test-http2-client-destroy.js @@ -145,6 +145,7 @@ const Countdown = require('../common/countdown'); server.on('stream', common.mustNotCall()); server.listen(0, common.mustCall(() => { const client = h2.connect(`http://localhost:${server.address().port}`); + client.on('close', common.mustCall()); const socket = client[kSocket]; socket.on('close', common.mustCall(() => { assert(socket.destroyed); diff --git a/test/parallel/test-http2-client-stream-destroy-before-connect.js b/test/parallel/test-http2-client-stream-destroy-before-connect.js index 902657bd58fb3e..09667750ae16fd 100644 --- a/test/parallel/test-http2-client-stream-destroy-before-connect.js +++ b/test/parallel/test-http2-client-stream-destroy-before-connect.js @@ -6,6 +6,7 @@ if (!common.hasCrypto) const assert = require('assert'); const h2 = require('http2'); const NGHTTP2_INTERNAL_ERROR = h2.constants.NGHTTP2_INTERNAL_ERROR; +const Countdown = require('../common/countdown'); const server = h2.createServer(); @@ -27,6 +28,11 @@ server.on('stream', (stream) => { server.listen(0, common.mustCall(() => { const client = h2.connect(`http://localhost:${server.address().port}`); + const countdown = new Countdown(2, () => { + server.close(); + client.close(); + }); + client.on('connect', () => countdown.dec()); const req = client.request(); req.destroy(new Error('test')); @@ -39,8 +45,7 @@ server.listen(0, common.mustCall(() => { req.on('close', common.mustCall(() => { assert.strictEqual(req.rstCode, NGHTTP2_INTERNAL_ERROR); assert.strictEqual(req.rstCode, NGHTTP2_INTERNAL_ERROR); - server.close(); - client.close(); + countdown.dec(); })); req.on('response', common.mustNotCall()); diff --git a/test/parallel/test-http2-compat-client-upload-reject.js b/test/parallel/test-http2-compat-client-upload-reject.js index e6a187cb12b264..6e3fee2e7c2ce3 100644 --- a/test/parallel/test-http2-compat-client-upload-reject.js +++ b/test/parallel/test-http2-compat-client-upload-reject.js @@ -23,9 +23,11 @@ fs.readFile(loc, common.mustCall((err, data) => { res.end(); }); })); + server.on('close', common.mustCall()); server.listen(0, common.mustCall(() => { const client = http2.connect(`http://localhost:${server.address().port}`); + client.on('close', common.mustCall()); const req = client.request({ ':method': 'POST' }); req.on('response', common.mustCall((headers) => { diff --git a/test/parallel/test-http2-create-client-connect.js b/test/parallel/test-http2-create-client-connect.js index 02c6c70642acb0..8a4fc9a1d0e075 100644 --- a/test/parallel/test-http2-create-client-connect.js +++ b/test/parallel/test-http2-create-client-connect.js @@ -38,11 +38,13 @@ const URL = url.URL; const client = h2.connect.apply(null, i) .on('connect', common.mustCall(() => maybeClose(client))); + client.on('close', common.mustCall()); }); // Will fail because protocol does not match the server. - h2.connect({ port: port, protocol: 'https:' }) + const client = h2.connect({ port: port, protocol: 'https:' }) .on('error', common.mustCall(() => serverClose.dec())); + client.on('close', common.mustCall()); })); } diff --git a/test/parallel/test-http2-goaway-opaquedata.js b/test/parallel/test-http2-goaway-opaquedata.js index 3f1fb4d7954414..56c0ae168c0c8b 100644 --- a/test/parallel/test-http2-goaway-opaquedata.js +++ b/test/parallel/test-http2-goaway-opaquedata.js @@ -8,20 +8,24 @@ const http2 = require('http2'); const server = http2.createServer(); const data = Buffer.from([0x1, 0x2, 0x3, 0x4, 0x5]); +let session; server.on('stream', common.mustCall((stream) => { - stream.session.goaway(0, 0, data); + session = stream.session; + session.on('close', common.mustCall()); + session.goaway(0, 0, data); stream.respond(); stream.end(); })); +server.on('close', common.mustCall()); server.listen(0, () => { - const client = http2.connect(`http://localhost:${server.address().port}`); client.once('goaway', common.mustCall((code, lastStreamID, buf) => { assert.deepStrictEqual(code, 0); assert.deepStrictEqual(lastStreamID, 1); assert.deepStrictEqual(data, buf); + session.close(); server.close(); })); const req = client.request(); diff --git a/test/parallel/test-http2-ping-settings-heapdump.js b/test/parallel/test-http2-ping-settings-heapdump.js index 78b3c8cd74f506..7d27310700c7a8 100644 --- a/test/parallel/test-http2-ping-settings-heapdump.js +++ b/test/parallel/test-http2-ping-settings-heapdump.js @@ -30,7 +30,12 @@ for (const variant of ['ping', 'settings']) { })); server.listen(0, common.mustCall(() => { - http2.connect(`http://localhost:${server.address().port}`, - common.mustCall()); + const client = http2.connect(`http://localhost:${server.address().port}`, + common.mustCall()); + client.on('error', (err) => { + // We destroy the session so it's possible to get ECONNRESET here. + if (err.code !== 'ECONNRESET') + throw err; + }); })); } diff --git a/test/parallel/test-http2-server-push-stream.js b/test/parallel/test-http2-server-push-stream.js index 95dd065bde9dd3..43e0e8d9320928 100644 --- a/test/parallel/test-http2-server-push-stream.js +++ b/test/parallel/test-http2-server-push-stream.js @@ -55,6 +55,8 @@ server.listen(0, common.mustCall(() => { assert.strictEqual(headers['x-push-data'], 'pushed by server'); })); stream.on('aborted', common.mustNotCall()); + // We have to read the data of the push stream to end gracefully. + stream.resume(); })); let data = '';