Skip to content

Commit

Permalink
http2: graceful session close when using .close()
Browse files Browse the repository at this point in the history
This slightly alters the behaviour of session close by using .end() on a
session socket instead of .destroy(). This allows the socket to finish
transmitting data, receive proper FIN packet and avoid ECONNRESET errors
upon graceful close.

Now after the session is closed we call ReadStart() on the underlying
stream to allow socket to receive the remaining data and FIN packet.
Previously only ReadStop() was used therefore blocking the receival of
FIN by the socket and 'end' event after .end() call.

onStreamClose now directly calls stream.destroy() instead of
kMaybeDestroy because the latter will first check that the stream has
writableFinished set. And that may not be true as we have just
(synchronously) called .end() on the stream if it was not closed and
that doesn't give it enough time to finish. Furthermore there is no
point in waiting for 'finish' as the other party have already closed the
stream and we won't be able to write anyway.

Few tests got changed because of this. They weren't correctly handling
graceful session close. This includes:
* not reading request data (on client side)
* not reading push stream data (on client side)
* relying on socket.destroy() (on client) to finish server session
  due to the destroy of the socket without closing the server session.
  As the goaway itself is *not* a session close.

This also led to a few missing 'close' session events. Added appropriate
mustCall checks.
  • Loading branch information
lundibundi committed Dec 8, 2019
1 parent b1e6a9e commit 71ef973
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 40 deletions.
109 changes: 74 additions & 35 deletions lib/internal/http2/core.js
Expand Up @@ -486,7 +486,10 @@ function onStreamClose(code) {
if (!stream || stream.destroyed)
return false;

debugStreamObj(stream, 'closed with code %d', code);
debugStreamObj(
stream, 'closed with code %d, closed %s, readable %s',
code, stream.closed, stream.readable
);

if (!stream.closed)
closeStream(stream, code, kNoRstStream);
Expand All @@ -495,7 +498,7 @@ function onStreamClose(code) {
// Defer destroy we actually emit end.
if (!stream.readable || code !== NGHTTP2_NO_ERROR) {
// If errored or ended, we can destroy immediately.
stream[kMaybeDestroy](code);
stream.destroy();
} else {
// Wait for end to destroy.
stream.on('end', stream[kMaybeDestroy]);
Expand Down Expand Up @@ -983,24 +986,79 @@ function emitClose(self, error) {
self.emit('close');
}

function finishSessionDestroy(session, error) {
debugSessionObj(session, 'finishSessionDestroy');

function cleanupSession(session) {
const socket = session[kSocket];
if (!socket.destroyed)
socket.destroy(error);

session[kProxySocket] = undefined;
session[kSocket] = undefined;
session[kHandle] = undefined;
session[kNativeFields] = new Uint8Array(kSessionUint8FieldCount);
socket[kSession] = undefined;
socket[kServer] = undefined;
if (socket) {
socket[kSession] = undefined;
socket[kServer] = undefined;
}
}

function finishSessionDestroy(session, error) {
debugSessionObj(session, 'finishSessionDestroy');

const handle = session[kHandle];
const socket = session[kSocket];
if (handle) handle.ondone = null;
cleanupSession(session);

if (socket && !socket.destroyed)
socket.destroy(error);

// Finally, emit the close and error events (if necessary) on next tick.
process.nextTick(emitClose, session, error);
}

function finishSessionClose(session) {
debugSessionObj(session, 'finishSessionClose');

const socket = session[kSocket];
const handle = session[kHandle];
if (handle) handle.ondone = null;
cleanupSession(session);

if (!socket.destroyed) {
socket.end(() => emitClose(session));
} else {
process.nextTick(emitClose, session);
}
}

function sessionClose(session, code, error, finishfn) {
debugSessionObj(session, 'start closing/destroying');

const state = session[kState];
state.flags |= SESSION_FLAGS_DESTROYED;
state.destroyCode = code;

// Clear timeout and remove timeout listeners.
session.setTimeout(0);
session.removeAllListeners('timeout');

// Destroy any pending and open streams
if (state.pendingStreams.size > 0 || state.streams.size > 0) {
const cancel = new ERR_HTTP2_STREAM_CANCEL(error);
state.pendingStreams.forEach((stream) => stream.destroy(cancel));
state.streams.forEach((stream) => stream.destroy(error));
}

// Disassociate from the socket and server.
const socket = session[kSocket];
const handle = session[kHandle];

// Destroy the handle if it exists at this point.
if (handle !== undefined) {
handle.ondone = finishfn.bind(null, session, error);
handle.destroy(code, socket.destroyed);
} else {
finishfn(session, error);
}
}

// Upon creation, the Http2Session takes ownership of the socket. The session
// may not be ready to use immediately if the socket is not yet fully connected.
// In that case, the Http2Session will wait for the socket to connect. Once
Expand Down Expand Up @@ -1325,6 +1383,7 @@ class Http2Session extends EventEmitter {
destroy(error = NGHTTP2_NO_ERROR, code) {
if (this.destroyed)
return;

debugSessionObj(this, 'destroying');

if (typeof error === 'number') {
Expand All @@ -1336,30 +1395,7 @@ class Http2Session extends EventEmitter {
if (code === undefined && error != null)
code = NGHTTP2_INTERNAL_ERROR;

const state = this[kState];
state.flags |= SESSION_FLAGS_DESTROYED;
state.destroyCode = code;

// Clear timeout and remove timeout listeners
this.setTimeout(0);
this.removeAllListeners('timeout');

// Destroy any pending and open streams
const cancel = new ERR_HTTP2_STREAM_CANCEL(error);
state.pendingStreams.forEach((stream) => stream.destroy(cancel));
state.streams.forEach((stream) => stream.destroy(error));

// Disassociate from the socket and server
const socket = this[kSocket];
const handle = this[kHandle];

// Destroy the handle if it exists at this point
if (handle !== undefined) {
handle.ondone = finishSessionDestroy.bind(null, this, error);
handle.destroy(code, socket.destroyed);
} else {
finishSessionDestroy(this, error);
}
sessionClose(this, code, error, finishSessionDestroy);
}

// Closing the session will:
Expand Down Expand Up @@ -1405,12 +1441,15 @@ class Http2Session extends EventEmitter {
const state = this[kState];
// Do not destroy if we're not closed and there are pending/open streams
if (!this.closed ||
this.destroyed ||
state.streams.size > 0 ||
state.pendingStreams.size > 0) {
return;
}
sessionClose(this, NGHTTP2_NO_ERROR, null, finishSessionClose);
} else {
this.destroy(error);
}
this.destroy(error);
}

_onTimeout() {
Expand Down
5 changes: 4 additions & 1 deletion src/node_http2.cc
Expand Up @@ -767,6 +767,7 @@ void Http2Session::Close(uint32_t code, bool socket_closed) {
if ((flags_ & SESSION_STATE_WRITE_IN_PROGRESS) == 0) {
Debug(this, "make done session callback");
MakeCallback(env()->ondone_string(), 0, nullptr);
if (stream_ != nullptr) stream_->ReadStart();
}

// If there are outstanding pings, those will need to be canceled, do
Expand Down Expand Up @@ -1566,7 +1567,9 @@ void Http2Session::OnStreamAfterWrite(WriteWrap* w, int status) {

if ((flags_ & SESSION_STATE_READING_STOPPED) &&
!(flags_ & SESSION_STATE_WRITE_IN_PROGRESS) &&
nghttp2_session_want_read(session_)) {
(nghttp2_session_want_read(session_) ||
(flags_ & SESSION_STATE_CLOSED) != 0)) {
Debug(this, "OnStreamAfterWrite read start");
flags_ &= ~SESSION_STATE_READING_STOPPED;
stream_->ReadStart();
}
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-http2-capture-rejection.js
Expand Up @@ -72,7 +72,6 @@ events.captureRejections = true;
}));
}


{
// Test error thrown in 'request' event

Expand Down Expand Up @@ -136,6 +135,7 @@ events.captureRejections = true;
const session = connect(`http://localhost:${port}`);

const req = session.request();
req.resume();

session.on('stream', common.mustCall(async (stream) => {
session.close();
Expand Down
2 changes: 2 additions & 0 deletions test/parallel/test-http2-compat-client-upload-reject.js
Expand Up @@ -23,9 +23,11 @@ fs.readFile(loc, common.mustCall((err, data) => {
res.end();
});
}));
server.on('close', common.mustCall());

server.listen(0, common.mustCall(() => {
const client = http2.connect(`http://localhost:${server.address().port}`);
client.on('close', common.mustCall());

const req = client.request({ ':method': 'POST' });
req.on('response', common.mustCall((headers) => {
Expand Down
4 changes: 3 additions & 1 deletion test/parallel/test-http2-create-client-connect.js
Expand Up @@ -38,11 +38,13 @@ const URL = url.URL;
const client =
h2.connect.apply(null, i)
.on('connect', common.mustCall(() => maybeClose(client)));
client.on('close', common.mustCall());
});

// Will fail because protocol does not match the server.
h2.connect({ port: port, protocol: 'https:' })
const client = h2.connect({ port: port, protocol: 'https:' })
.on('error', common.mustCall(() => serverClose.dec()));
client.on('close', common.mustCall());
}));
}

Expand Down
8 changes: 6 additions & 2 deletions test/parallel/test-http2-goaway-opaquedata.js
Expand Up @@ -8,20 +8,24 @@ const http2 = require('http2');

const server = http2.createServer();
const data = Buffer.from([0x1, 0x2, 0x3, 0x4, 0x5]);
let session;

server.on('stream', common.mustCall((stream) => {
stream.session.goaway(0, 0, data);
session = stream.session;
session.on('close', common.mustCall());
session.goaway(0, 0, data);
stream.respond();
stream.end();
}));
server.on('close', common.mustCall());

server.listen(0, () => {

const client = http2.connect(`http://localhost:${server.address().port}`);
client.once('goaway', common.mustCall((code, lastStreamID, buf) => {
assert.deepStrictEqual(code, 0);
assert.deepStrictEqual(lastStreamID, 1);
assert.deepStrictEqual(data, buf);
session.close();
server.close();
}));
const req = client.request();
Expand Down
2 changes: 2 additions & 0 deletions test/parallel/test-http2-server-push-stream.js
Expand Up @@ -55,6 +55,8 @@ server.listen(0, common.mustCall(() => {
assert.strictEqual(headers['x-push-data'], 'pushed by server');
}));
stream.on('aborted', common.mustNotCall());
// We have to read the data of the push stream to end gracefully.
stream.resume();
}));

let data = '';
Expand Down

0 comments on commit 71ef973

Please sign in to comment.