Skip to content

Commit b732c92

Browse files
clshortfuseMylesBorins
authored andcommittedNov 16, 2020
http2: use and support non-empty DATA frame with END_STREAM flag
Adds support for reading from a stream where the final frame is a non-empty DATA frame with the END_STREAM flag set, instead of hanging waiting for another frame. When writing to a stream, uses a END_STREAM flag on final DATA frame instead of adding an empty DATA frame. BREAKING: http2 client now expects servers to properly support END_STREAM flag Fixes: #31309 Fixes: #33891 Refs: https://nghttp2.org/documentation/types.html#c.nghttp2_on_data_chunk_recv_callback Backport-PR-URL: #34845 PR-URL: #33875 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent bfce0eb commit b732c92

6 files changed

+193
-47
lines changed
 

‎lib/internal/http2/core.js

+84-22
Original file line numberDiff line numberDiff line change
@@ -1158,6 +1158,7 @@ class Http2Session extends EventEmitter {
11581158
streams: new Map(),
11591159
pendingStreams: new Set(),
11601160
pendingAck: 0,
1161+
shutdownWritableCalled: false,
11611162
writeQueueSize: 0,
11621163
originSet: undefined
11631164
};
@@ -1724,6 +1725,26 @@ function afterShutdown(status) {
17241725
stream[kMaybeDestroy]();
17251726
}
17261727

1728+
function shutdownWritable(callback) {
1729+
const handle = this[kHandle];
1730+
if (!handle) return callback();
1731+
const state = this[kState];
1732+
if (state.shutdownWritableCalled) {
1733+
// Backport v12.x: Session required for debugging stream object
1734+
// debugStreamObj(this, 'shutdownWritable() already called');
1735+
return callback();
1736+
}
1737+
state.shutdownWritableCalled = true;
1738+
1739+
const req = new ShutdownWrap();
1740+
req.oncomplete = afterShutdown;
1741+
req.callback = callback;
1742+
req.handle = handle;
1743+
const err = handle.shutdown(req);
1744+
if (err === 1) // synchronous finish
1745+
return afterShutdown.call(req, 0);
1746+
}
1747+
17271748
function finishSendTrailers(stream, headersList) {
17281749
// The stream might be destroyed and in that case
17291750
// there is nothing to do.
@@ -1983,10 +2004,50 @@ class Http2Stream extends Duplex {
19832004

19842005
let req;
19852006

2007+
let waitingForWriteCallback = true;
2008+
let waitingForEndCheck = true;
2009+
let writeCallbackErr;
2010+
let endCheckCallbackErr;
2011+
const done = () => {
2012+
if (waitingForEndCheck || waitingForWriteCallback) return;
2013+
const err = writeCallbackErr || endCheckCallbackErr;
2014+
// writeGeneric does not destroy on error and
2015+
// we cannot enable autoDestroy,
2016+
// so make sure to destroy on error.
2017+
if (err) {
2018+
this.destroy(err);
2019+
}
2020+
cb(err);
2021+
};
2022+
const writeCallback = (err) => {
2023+
waitingForWriteCallback = false;
2024+
writeCallbackErr = err;
2025+
done();
2026+
};
2027+
const endCheckCallback = (err) => {
2028+
waitingForEndCheck = false;
2029+
endCheckCallbackErr = err;
2030+
done();
2031+
};
2032+
// Shutdown write stream right after last chunk is sent
2033+
// so final DATA frame can include END_STREAM flag
2034+
process.nextTick(() => {
2035+
if (writeCallbackErr ||
2036+
!this._writableState.ending ||
2037+
// Backport v12.x: _writableState.buffered does not exist
2038+
// this._writableState.buffered.length ||
2039+
this._writableState.bufferedRequest ||
2040+
(this[kState].flags & STREAM_FLAGS_HAS_TRAILERS))
2041+
return endCheckCallback();
2042+
// Backport v12.x: Session required for debugging stream object
2043+
// debugStreamObj(this, 'shutting down writable on last write');
2044+
shutdownWritable.call(this, endCheckCallback);
2045+
});
2046+
19862047
if (writev)
1987-
req = writevGeneric(this, data, cb);
2048+
req = writevGeneric(this, data, writeCallback);
19882049
else
1989-
req = writeGeneric(this, data, encoding, cb);
2050+
req = writeGeneric(this, data, encoding, writeCallback);
19902051

19912052
trackWriteState(this, req.bytes);
19922053
}
@@ -2000,21 +2061,13 @@ class Http2Stream extends Duplex {
20002061
}
20012062

20022063
_final(cb) {
2003-
const handle = this[kHandle];
20042064
if (this.pending) {
20052065
this.once('ready', () => this._final(cb));
2006-
} else if (handle !== undefined) {
2007-
debugStreamObj(this, '_final shutting down');
2008-
const req = new ShutdownWrap();
2009-
req.oncomplete = afterShutdown;
2010-
req.callback = cb;
2011-
req.handle = handle;
2012-
const err = handle.shutdown(req);
2013-
if (err === 1) // synchronous finish
2014-
return afterShutdown.call(req, 0);
2015-
} else {
2016-
cb();
2066+
return;
20172067
}
2068+
// Backport v12.x: Session required for debugging stream object
2069+
// debugStreamObj(this, 'shutting down writable on _final');
2070+
shutdownWritable.call(this, cb);
20182071
}
20192072

20202073
_read(nread) {
@@ -2119,11 +2172,20 @@ class Http2Stream extends Duplex {
21192172
debugStream(this[kID] || 'pending', session[kType], 'destroying stream');
21202173

21212174
const state = this[kState];
2122-
const sessionCode = session[kState].goawayCode ||
2123-
session[kState].destroyCode;
2124-
const code = err != null ?
2125-
sessionCode || NGHTTP2_INTERNAL_ERROR :
2126-
state.rstCode || sessionCode;
2175+
const sessionState = session[kState];
2176+
const sessionCode = sessionState.goawayCode || sessionState.destroyCode;
2177+
2178+
// If a stream has already closed successfully, there is no error
2179+
// to report from this stream, even if the session has errored.
2180+
// This can happen if the stream was already in process of destroying
2181+
// after a successful close, but the session had a error between
2182+
// this stream's close and destroy operations.
2183+
// Previously, this always overrode a successful close operation code
2184+
// NGHTTP2_NO_ERROR (0) with sessionCode because the use of the || operator.
2185+
const code = (err != null ?
2186+
(sessionCode || NGHTTP2_INTERNAL_ERROR) :
2187+
(this.closed ? this.rstCode : sessionCode)
2188+
);
21272189
const hasHandle = handle !== undefined;
21282190

21292191
if (!this.closed)
@@ -2132,13 +2194,13 @@ class Http2Stream extends Duplex {
21322194

21332195
if (hasHandle) {
21342196
handle.destroy();
2135-
session[kState].streams.delete(id);
2197+
sessionState.streams.delete(id);
21362198
} else {
2137-
session[kState].pendingStreams.delete(this);
2199+
sessionState.pendingStreams.delete(this);
21382200
}
21392201

21402202
// Adjust the write queue size for accounting
2141-
session[kState].writeQueueSize -= state.writeQueueSize;
2203+
sessionState.writeQueueSize -= state.writeQueueSize;
21422204
state.writeQueueSize = 0;
21432205

21442206
// RST code 8 not emitted as an error as its used by clients to signify

‎src/node_http2.cc

+7-6
Original file line numberDiff line numberDiff line change
@@ -811,7 +811,7 @@ ssize_t Http2Session::OnCallbackPadding(size_t frameLen,
811811
// quite expensive. This is a potential performance optimization target later.
812812
ssize_t Http2Session::ConsumeHTTP2Data() {
813813
CHECK_NOT_NULL(stream_buf_.base);
814-
CHECK_LT(stream_buf_offset_, stream_buf_.len);
814+
CHECK_LE(stream_buf_offset_, stream_buf_.len);
815815
size_t read_len = stream_buf_.len - stream_buf_offset_;
816816

817817
// multiple side effects.
@@ -832,11 +832,11 @@ ssize_t Http2Session::ConsumeHTTP2Data() {
832832
CHECK_GT(ret, 0);
833833
CHECK_LE(static_cast<size_t>(ret), read_len);
834834

835-
if (static_cast<size_t>(ret) < read_len) {
836-
// Mark the remainder of the data as available for later consumption.
837-
stream_buf_offset_ += ret;
838-
return ret;
839-
}
835+
// Mark the remainder of the data as available for later consumption.
836+
// Even if all bytes were received, a paused stream may delay the
837+
// nghttp2_on_frame_recv_callback which may have an END_STREAM flag.
838+
stream_buf_offset_ += ret;
839+
return ret;
840840
}
841841

842842
// We are done processing the current input chunk.
@@ -1174,6 +1174,7 @@ int Http2Session::OnDataChunkReceived(nghttp2_session* handle,
11741174
if (session->flags_ & SESSION_STATE_WRITE_IN_PROGRESS) {
11751175
CHECK_NE(session->flags_ & SESSION_STATE_READING_STOPPED, 0);
11761176
session->flags_ |= SESSION_STATE_NGHTTP2_RECV_PAUSED;
1177+
Debug(session, "receive paused");
11771178
return NGHTTP2_ERR_PAUSE;
11781179
}
11791180

‎test/parallel/test-http2-misbehaving-multiplex.js

+39-17
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Flags: --expose-internals
33

44
const common = require('../common');
5+
const assert = require('assert');
56

67
if (!common.hasCrypto)
78
common.skip('missing crypto');
@@ -13,16 +14,36 @@ const h2test = require('../common/http2');
1314
let client;
1415

1516
const server = h2.createServer();
17+
let gotFirstStreamId1;
1618
server.on('stream', common.mustCall((stream) => {
1719
stream.respond();
1820
stream.end('ok');
1921

20-
// The error will be emitted asynchronously
21-
stream.on('error', common.expectsError({
22-
constructor: NghttpError,
23-
code: 'ERR_HTTP2_ERROR',
24-
message: 'Stream was already closed or invalid'
25-
}));
22+
// Http2Server should be fast enough to respond to and close
23+
// the first streams with ID 1 and ID 3 without errors.
24+
25+
// Test for errors in 'close' event to ensure no errors on some streams.
26+
stream.on('error', () => {});
27+
stream.on('close', (err) => {
28+
if (stream.id === 1) {
29+
if (gotFirstStreamId1) {
30+
// We expect our outgoing frames to fail on Stream ID 1 the second time
31+
// because a stream with ID 1 was already closed before.
32+
common.expectsError({
33+
constructor: NghttpError,
34+
code: 'ERR_HTTP2_ERROR',
35+
message: 'Stream was already closed or invalid'
36+
});
37+
return;
38+
}
39+
gotFirstStreamId1 = true;
40+
}
41+
assert.strictEqual(err, undefined);
42+
});
43+
44+
// Stream ID 5 should never reach the server
45+
assert.notStrictEqual(stream.id, 5);
46+
2647
}, 2));
2748

2849
server.on('session', common.mustCall((session) => {
@@ -35,26 +56,27 @@ server.on('session', common.mustCall((session) => {
3556

3657
const settings = new h2test.SettingsFrame();
3758
const settingsAck = new h2test.SettingsFrame(true);
38-
const head1 = new h2test.HeadersFrame(1, h2test.kFakeRequestHeaders, 0, true);
39-
const head2 = new h2test.HeadersFrame(3, h2test.kFakeRequestHeaders, 0, true);
40-
const head3 = new h2test.HeadersFrame(1, h2test.kFakeRequestHeaders, 0, true);
41-
const head4 = new h2test.HeadersFrame(5, h2test.kFakeRequestHeaders, 0, true);
59+
// HeadersFrame(id, payload, padding, END_STREAM)
60+
const id1 = new h2test.HeadersFrame(1, h2test.kFakeRequestHeaders, 0, true);
61+
const id3 = new h2test.HeadersFrame(3, h2test.kFakeRequestHeaders, 0, true);
62+
const id5 = new h2test.HeadersFrame(5, h2test.kFakeRequestHeaders, 0, true);
4263

4364
server.listen(0, () => {
4465
client = net.connect(server.address().port, () => {
4566
client.write(h2test.kClientMagic, () => {
4667
client.write(settings.data, () => {
4768
client.write(settingsAck.data);
48-
// This will make it ok.
49-
client.write(head1.data, () => {
50-
// This will make it ok.
51-
client.write(head2.data, () => {
69+
// Stream ID 1 frame will make it OK.
70+
client.write(id1.data, () => {
71+
// Stream ID 3 frame will make it OK.
72+
client.write(id3.data, () => {
73+
// A second Stream ID 1 frame should fail.
5274
// This will cause an error to occur because the client is
5375
// attempting to reuse an already closed stream. This must
5476
// cause the server session to be torn down.
55-
client.write(head3.data, () => {
56-
// This won't ever make it to the server
57-
client.write(head4.data);
77+
client.write(id1.data, () => {
78+
// This Stream ID 5 frame will never make it to the server
79+
client.write(id5.data);
5880
});
5981
});
6082
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
if (!common.hasCrypto)
5+
common.skip('missing crypto');
6+
const assert = require('assert');
7+
const http2 = require('http2');
8+
9+
const { PerformanceObserver } = require('perf_hooks');
10+
11+
const server = http2.createServer();
12+
13+
server.on('stream', (stream, headers) => {
14+
stream.respond({
15+
'content-type': 'text/html',
16+
':status': 200
17+
});
18+
switch (headers[':path']) {
19+
case '/singleEnd':
20+
stream.end('OK');
21+
break;
22+
case '/sequentialEnd':
23+
stream.write('OK');
24+
stream.end();
25+
break;
26+
case '/delayedEnd':
27+
stream.write('OK', () => stream.end());
28+
break;
29+
}
30+
});
31+
32+
function testRequest(path, targetFrameCount, callback) {
33+
const obs = new PerformanceObserver((list, observer) => {
34+
const entry = list.getEntries()[0];
35+
if (entry.name !== 'Http2Session') return;
36+
if (entry.type !== 'client') return;
37+
assert.strictEqual(entry.framesReceived, targetFrameCount);
38+
observer.disconnect();
39+
callback();
40+
});
41+
obs.observe({ entryTypes: ['http2'] });
42+
const client = http2.connect(`http://localhost:${server.address().port}`, () => {
43+
const req = client.request({ ':path': path });
44+
req.resume();
45+
req.end();
46+
req.on('end', () => client.close());
47+
});
48+
}
49+
50+
// SETTINGS => SETTINGS => HEADERS => DATA
51+
const MIN_FRAME_COUNT = 4;
52+
53+
server.listen(0, () => {
54+
testRequest('/singleEnd', MIN_FRAME_COUNT, () => {
55+
testRequest('/sequentialEnd', MIN_FRAME_COUNT, () => {
56+
testRequest('/delayedEnd', MIN_FRAME_COUNT + 1, () => {
57+
server.close();
58+
});
59+
});
60+
});
61+
});

‎test/parallel/test-http2-padding-aligned.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ const makeDuplexPair = require('../common/duplexpair');
2626
// The lengths of the expected writes... note that this is highly
2727
// sensitive to how the internals are implemented.
2828
const serverLengths = [24, 9, 9, 32];
29-
const clientLengths = [9, 9, 48, 9, 1, 21, 1, 16];
29+
const clientLengths = [9, 9, 48, 9, 1, 21, 1];
3030

3131
// Adjust for the 24-byte preamble and two 9-byte settings frames, and
3232
// the result must be equally divisible by 8

‎test/parallel/test-http2-perf_hooks.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ const obs = new PerformanceObserver(common.mustCall((items) => {
3030
break;
3131
case 'client':
3232
assert.strictEqual(entry.streamCount, 1);
33-
assert.strictEqual(entry.framesReceived, 8);
33+
assert.strictEqual(entry.framesReceived, 7);
3434
break;
3535
default:
3636
assert.fail('invalid Http2Session type');

0 commit comments

Comments
 (0)
Please sign in to comment.