Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http2: use and support non-empty DATA frame with END_STREAM flag #33875

Closed
wants to merge 14 commits into from
98 changes: 73 additions & 25 deletions lib/internal/http2/core.js
Expand Up @@ -1140,6 +1140,7 @@ class Http2Session extends EventEmitter {
streams: new Map(),
pendingStreams: new Set(),
pendingAck: 0,
shutdownWritableCalled: false,
writeQueueSize: 0,
originSet: undefined
};
Expand Down Expand Up @@ -1718,6 +1719,25 @@ function afterShutdown(status) {
this.callback();
}

function shutdownWritable(callback) {
const handle = this[kHandle];
if (!handle) return callback();
const state = this[kState];
if (state.shutdownWritableCalled) {
debugStreamObj(this, 'shutdownWritable() already called');
return callback();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this lead to a corrupted state where the callback from _final would get called right away and therefore the stream would go on with finish sequence even though we are not done (i.e. ShutdownWrap is not done)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the only way to solve this is to store the cb from _final and call it when real ShutdownWrap finishes (maybe in endCheckCallback).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final won't get called until the last write callback is fired. Writable wraps all the write callbacks

stream._writev(chunk, state.onwrite);
.

It's important the kWriteGeneric function waits for both commands to finish (including Shutdown) before calling the cb function, which will then cause final to eventually be called.

Also, even if Do::Shutdown() is called twice (in write() and in final(), it's of no actual consequence. This check is just an optimization, not a requirement.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's important the kWriteGeneric function waits for both commands to finish (including Shutdown) before calling the cb function, which will then cause final to eventually be called.

I see now, thanks for the explanation. I guess that would work.

ping @ronag could you PTAL at this from streams perspective?

}
state.shutdownWritableCalled = true;

const req = new ShutdownWrap();
req.oncomplete = afterShutdown;
req.callback = callback;
req.handle = handle;
const err = handle.shutdown(req);
if (err === 1) // synchronous finish
return afterShutdown.call(req, 0);
}

function finishSendTrailers(stream, headersList) {
// The stream might be destroyed and in that case
// there is nothing to do.
Expand Down Expand Up @@ -1978,19 +1998,47 @@ class Http2Stream extends Duplex {

let req;

// writeGeneric does not destroy on error and we cannot enable autoDestroy,
// so make sure to destroy on error.
const callback = (err) => {
let waitingForWriteCallback = true;
let waitingForEndCheck = true;
let writeCallbackErr;
let endCheckCallbackErr;
const done = () => {
if (waitingForEndCheck || waitingForWriteCallback) return;
const err = writeCallbackErr || endCheckCallbackErr;
// writeGeneric does not destroy on error and
// we cannot enable autoDestroy,
// so make sure to destroy on error.
if (err) {
this.destroy(err);
}
cb(err);
};
const writeCallback = (err) => {
waitingForWriteCallback = false;
writeCallbackErr = err;
done();
};
const endCheckCallback = (err) => {
waitingForEndCheck = false;
endCheckCallbackErr = err;
done();
};
// Shutdown write stream right after last chunk is sent
// so final DATA frame can include END_STREAM flag
process.nextTick(() => {
if (writeCallbackErr ||
!this._writableState.ending ||
this._writableState.buffered.length ||
(this[kState].flags & STREAM_FLAGS_HAS_TRAILERS))
return endCheckCallback();
debugStreamObj(this, 'shutting down writable on last write');
shutdownWritable.call(this, endCheckCallback);
});

if (writev)
req = writevGeneric(this, data, callback);
req = writevGeneric(this, data, writeCallback);
else
req = writeGeneric(this, data, encoding, callback);
req = writeGeneric(this, data, encoding, writeCallback);

trackWriteState(this, req.bytes);
}
Expand All @@ -2004,21 +2052,12 @@ class Http2Stream extends Duplex {
}

_final(cb) {
const handle = this[kHandle];
if (this.pending) {
this.once('ready', () => this._final(cb));
} else if (handle !== undefined) {
debugStreamObj(this, '_final shutting down');
const req = new ShutdownWrap();
req.oncomplete = afterShutdown;
req.callback = cb;
req.handle = handle;
const err = handle.shutdown(req);
if (err === 1) // synchronous finish
return afterShutdown.call(req, 0);
} else {
cb();
return;
}
debugStreamObj(this, 'shutting down writable on _final');
shutdownWritable.call(this, cb);
}

_read(nread) {
Expand Down Expand Up @@ -2122,11 +2161,20 @@ class Http2Stream extends Duplex {
debugStream(this[kID] || 'pending', session[kType], 'destroying stream');

const state = this[kState];
const sessionCode = session[kState].goawayCode ||
session[kState].destroyCode;
const code = err != null ?
sessionCode || NGHTTP2_INTERNAL_ERROR :
state.rstCode || sessionCode;
const sessionState = session[kState];
const sessionCode = sessionState.goawayCode || sessionState.destroyCode;

// If a stream has already closed successfully, there is no error
// to report from this stream, even if the session has errored.
// This can happen if the stream was already in process of destroying
// after a successful close, but the session had a error between
// this stream's close and destroy operations.
// Previously, this always overrode a successful close operation code
// NGHTTP2_NO_ERROR (0) with sessionCode because the use of the || operator.
const code = (err != null ?
(sessionCode || NGHTTP2_INTERNAL_ERROR) :
(this.closed ? this.rstCode : sessionCode)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, a stream can now end so quickly, it can close successfully before the next stream can emit an error that bubbles up to the session (see: test-http2-misbehaving-multiplex.js). I had to slip in this bug-fix to account for that.

);
const hasHandle = handle !== undefined;

if (!this.closed)
Expand All @@ -2135,13 +2183,13 @@ class Http2Stream extends Duplex {

if (hasHandle) {
handle.destroy();
session[kState].streams.delete(id);
sessionState.streams.delete(id);
} else {
session[kState].pendingStreams.delete(this);
sessionState.pendingStreams.delete(this);
}

// Adjust the write queue size for accounting
session[kState].writeQueueSize -= state.writeQueueSize;
sessionState.writeQueueSize -= state.writeQueueSize;
state.writeQueueSize = 0;

// RST code 8 not emitted as an error as its used by clients to signify
Expand Down
13 changes: 7 additions & 6 deletions src/node_http2.cc
Expand Up @@ -734,7 +734,7 @@ ssize_t Http2Session::OnMaxFrameSizePadding(size_t frameLen,
// quite expensive. This is a potential performance optimization target later.
ssize_t Http2Session::ConsumeHTTP2Data() {
CHECK_NOT_NULL(stream_buf_.base);
CHECK_LT(stream_buf_offset_, stream_buf_.len);
CHECK_LE(stream_buf_offset_, stream_buf_.len);
size_t read_len = stream_buf_.len - stream_buf_offset_;

// multiple side effects.
Expand All @@ -755,11 +755,11 @@ ssize_t Http2Session::ConsumeHTTP2Data() {
CHECK_GT(ret, 0);
CHECK_LE(static_cast<size_t>(ret), read_len);

if (static_cast<size_t>(ret) < read_len) {
// Mark the remainder of the data as available for later consumption.
stream_buf_offset_ += ret;
return ret;
}
// Mark the remainder of the data as available for later consumption.
// Even if all bytes were received, a paused stream may delay the
// nghttp2_on_frame_recv_callback which may have an END_STREAM flag.
stream_buf_offset_ += ret;
return ret;
}

// We are done processing the current input chunk.
Expand Down Expand Up @@ -1095,6 +1095,7 @@ int Http2Session::OnDataChunkReceived(nghttp2_session* handle,
if (session->is_write_in_progress()) {
CHECK(session->is_reading_stopped());
session->set_receive_paused();
Debug(session, "receive paused");
return NGHTTP2_ERR_PAUSE;
}

Expand Down
56 changes: 39 additions & 17 deletions test/parallel/test-http2-misbehaving-multiplex.js
Expand Up @@ -2,6 +2,7 @@
// Flags: --expose-internals

const common = require('../common');
const assert = require('assert');

if (!common.hasCrypto)
common.skip('missing crypto');
Expand All @@ -13,16 +14,36 @@ const h2test = require('../common/http2');
let client;

const server = h2.createServer();
let gotFirstStreamId1;
server.on('stream', common.mustCall((stream) => {
stream.respond();
stream.end('ok');

// The error will be emitted asynchronously
stream.on('error', common.expectsError({
constructor: NghttpError,
code: 'ERR_HTTP2_ERROR',
message: 'Stream was already closed or invalid'
}));
// Http2Server should be fast enough to respond to and close
// the first streams with ID 1 and ID 3 without errors.

// Test for errors in 'close' event to ensure no errors on some streams.
stream.on('error', () => {});
stream.on('close', (err) => {
if (stream.id === 1) {
if (gotFirstStreamId1) {
// We expect our outgoing frames to fail on Stream ID 1 the second time
// because a stream with ID 1 was already closed before.
common.expectsError({
constructor: NghttpError,
code: 'ERR_HTTP2_ERROR',
message: 'Stream was already closed or invalid'
});
return;
}
gotFirstStreamId1 = true;
}
assert.strictEqual(err, undefined);
});

// Stream ID 5 should never reach the server
assert.notStrictEqual(stream.id, 5);

}, 2));

server.on('session', common.mustCall((session) => {
Expand All @@ -35,26 +56,27 @@ server.on('session', common.mustCall((session) => {

const settings = new h2test.SettingsFrame();
const settingsAck = new h2test.SettingsFrame(true);
const head1 = new h2test.HeadersFrame(1, h2test.kFakeRequestHeaders, 0, true);
const head2 = new h2test.HeadersFrame(3, h2test.kFakeRequestHeaders, 0, true);
const head3 = new h2test.HeadersFrame(1, h2test.kFakeRequestHeaders, 0, true);
const head4 = new h2test.HeadersFrame(5, h2test.kFakeRequestHeaders, 0, true);
// HeadersFrame(id, payload, padding, END_STREAM)
const id1 = new h2test.HeadersFrame(1, h2test.kFakeRequestHeaders, 0, true);
const id3 = new h2test.HeadersFrame(3, h2test.kFakeRequestHeaders, 0, true);
const id5 = new h2test.HeadersFrame(5, h2test.kFakeRequestHeaders, 0, true);

server.listen(0, () => {
client = net.connect(server.address().port, () => {
client.write(h2test.kClientMagic, () => {
client.write(settings.data, () => {
client.write(settingsAck.data);
// This will make it ok.
client.write(head1.data, () => {
// This will make it ok.
client.write(head2.data, () => {
// Stream ID 1 frame will make it OK.
client.write(id1.data, () => {
// Stream ID 3 frame will make it OK.
client.write(id3.data, () => {
// A second Stream ID 1 frame should fail.
// This will cause an error to occur because the client is
// attempting to reuse an already closed stream. This must
// cause the server session to be torn down.
client.write(head3.data, () => {
// This won't ever make it to the server
client.write(head4.data);
client.write(id1.data, () => {
// This Stream ID 5 frame will never make it to the server
client.write(id5.data);
});
});
});
Expand Down
61 changes: 61 additions & 0 deletions test/parallel/test-http2-pack-end-stream-flag.js
@@ -0,0 +1,61 @@
'use strict';

const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const assert = require('assert');
const http2 = require('http2');

const { PerformanceObserver } = require('perf_hooks');

const server = http2.createServer();

server.on('stream', (stream, headers) => {
stream.respond({
'content-type': 'text/html',
':status': 200
});
switch (headers[':path']) {
case '/singleEnd':
stream.end('OK');
break;
case '/sequentialEnd':
stream.write('OK');
stream.end();
break;
case '/delayedEnd':
stream.write('OK', () => stream.end());
break;
}
});

function testRequest(path, targetFrameCount, callback) {
const obs = new PerformanceObserver((list, observer) => {
const entry = list.getEntries()[0];
if (entry.name !== 'Http2Session') return;
if (entry.type !== 'client') return;
assert.strictEqual(entry.framesReceived, targetFrameCount);
observer.disconnect();
callback();
});
obs.observe({ entryTypes: ['http2'] });
const client = http2.connect(`http://localhost:${server.address().port}`, () => {
const req = client.request({ ':path': path });
req.resume();
req.end();
req.on('end', () => client.close());
});
}

// SETTINGS => SETTINGS => HEADERS => DATA
const MIN_FRAME_COUNT = 4;

server.listen(0, () => {
testRequest('/singleEnd', MIN_FRAME_COUNT, () => {
testRequest('/sequentialEnd', MIN_FRAME_COUNT, () => {
testRequest('/delayedEnd', MIN_FRAME_COUNT + 1, () => {
server.close();
});
});
});
});
2 changes: 1 addition & 1 deletion test/parallel/test-http2-padding-aligned.js
Expand Up @@ -26,7 +26,7 @@ const makeDuplexPair = require('../common/duplexpair');
// The lengths of the expected writes... note that this is highly
// sensitive to how the internals are implemented.
const serverLengths = [24, 9, 9, 32];
const clientLengths = [9, 9, 48, 9, 1, 21, 1, 16];
const clientLengths = [9, 9, 48, 9, 1, 21, 1];

// Adjust for the 24-byte preamble and two 9-byte settings frames, and
// the result must be equally divisible by 8
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-http2-perf_hooks.js
Expand Up @@ -30,7 +30,7 @@ const obs = new PerformanceObserver(common.mustCall((items) => {
break;
case 'client':
assert.strictEqual(entry.streamCount, 1);
assert.strictEqual(entry.framesReceived, 8);
assert.strictEqual(entry.framesReceived, 7);
break;
default:
assert.fail('invalid Http2Session type');
Expand Down