Skip to content

Commit

Permalink
http2: support non-empty DATA frame with END_STREAM flag
Browse files Browse the repository at this point in the history
Adds support for reading from a stream where the final frame is a
non-empty DATA frame with the END_STREAM flag set, instead of hanging
waiting for another frame.

Fixes: nodejs#31309
Refs: https://nghttp2.org/documentation/types.html#c.nghttp2_on_data_chunk_recv_callback

PR-URL: nodejs#33875
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: James M Snell <jasnell@gmail.com>
  • Loading branch information
clshortfuse committed Aug 20, 2020
1 parent 7e85d4a commit 2a3914d
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 23 deletions.
13 changes: 7 additions & 6 deletions src/node_http2.cc
Expand Up @@ -882,7 +882,7 @@ ssize_t Http2Session::OnCallbackPadding(size_t frameLen,
// quite expensive. This is a potential performance optimization target later.
ssize_t Http2Session::ConsumeHTTP2Data() {
CHECK_NOT_NULL(stream_buf_.base);
CHECK_LT(stream_buf_offset_, stream_buf_.len);
CHECK_LE(stream_buf_offset_, stream_buf_.len);
size_t read_len = stream_buf_.len - stream_buf_offset_;

// multiple side effects.
Expand All @@ -903,11 +903,11 @@ ssize_t Http2Session::ConsumeHTTP2Data() {
CHECK_GT(ret, 0);
CHECK_LE(static_cast<size_t>(ret), read_len);

if (static_cast<size_t>(ret) < read_len) {
// Mark the remainder of the data as available for later consumption.
stream_buf_offset_ += ret;
return ret;
}
// Mark the remainder of the data as available for later consumption.
// Even if all bytes were received, a paused stream may delay the
// nghttp2_on_frame_recv_callback which may have an END_STREAM flag.
stream_buf_offset_ += ret;
return ret;
}

// We are done processing the current input chunk.
Expand Down Expand Up @@ -1241,6 +1241,7 @@ int Http2Session::OnDataChunkReceived(nghttp2_session* handle,
if (session->flags_ & SESSION_STATE_WRITE_IN_PROGRESS) {
CHECK_NE(session->flags_ & SESSION_STATE_READING_STOPPED, 0);
session->flags_ |= SESSION_STATE_NGHTTP2_RECV_PAUSED;
Debug(session, "receive paused");
return NGHTTP2_ERR_PAUSE;
}

Expand Down
56 changes: 39 additions & 17 deletions test/parallel/test-http2-misbehaving-multiplex.js
Expand Up @@ -2,6 +2,7 @@
// Flags: --expose-internals

const common = require('../common');
const assert = require('assert');

if (!common.hasCrypto)
common.skip('missing crypto');
Expand All @@ -13,16 +14,36 @@ const h2test = require('../common/http2');
let client;

const server = h2.createServer();
let gotFirstStreamId1;
server.on('stream', common.mustCall((stream) => {
stream.respond();
stream.end('ok');

// the error will be emitted asynchronously
stream.on('error', common.expectsError({
type: NghttpError,
code: 'ERR_HTTP2_ERROR',
message: 'Stream was already closed or invalid'
}));
// Http2Server should be fast enough to respond to and close
// the first streams with ID 1 and ID 3 without errors.

// Test for errors in 'close' event to ensure no errors on some streams.
stream.on('error', () => {});
stream.on('close', (err) => {
if (stream.id === 1) {
if (gotFirstStreamId1) {
// We expect our outgoing frames to fail on Stream ID 1 the second time
// because a stream with ID 1 was already closed before.
common.expectsError({
constructor: NghttpError,
code: 'ERR_HTTP2_ERROR',
message: 'Stream was already closed or invalid'
});
return;
}
gotFirstStreamId1 = true;
}
assert.strictEqual(err, undefined);
});

// Stream ID 5 should never reach the server
assert.notStrictEqual(stream.id, 5);

}, 2));

server.on('session', common.mustCall((session) => {
Expand All @@ -35,26 +56,27 @@ server.on('session', common.mustCall((session) => {

const settings = new h2test.SettingsFrame();
const settingsAck = new h2test.SettingsFrame(true);
const head1 = new h2test.HeadersFrame(1, h2test.kFakeRequestHeaders, 0, true);
const head2 = new h2test.HeadersFrame(3, h2test.kFakeRequestHeaders, 0, true);
const head3 = new h2test.HeadersFrame(1, h2test.kFakeRequestHeaders, 0, true);
const head4 = new h2test.HeadersFrame(5, h2test.kFakeRequestHeaders, 0, true);
// HeadersFrame(id, payload, padding, END_STREAM)
const id1 = new h2test.HeadersFrame(1, h2test.kFakeRequestHeaders, 0, true);
const id3 = new h2test.HeadersFrame(3, h2test.kFakeRequestHeaders, 0, true);
const id5 = new h2test.HeadersFrame(5, h2test.kFakeRequestHeaders, 0, true);

server.listen(0, () => {
client = net.connect(server.address().port, () => {
client.write(h2test.kClientMagic, () => {
client.write(settings.data, () => {
client.write(settingsAck.data);
// This will make it ok.
client.write(head1.data, () => {
// This will make it ok.
client.write(head2.data, () => {
// Stream ID 1 frame will make it OK.
client.write(id1.data, () => {
// Stream ID 3 frame will make it OK.
client.write(id3.data, () => {
// A second Stream ID 1 frame should fail.
// This will cause an error to occur because the client is
// attempting to reuse an already closed stream. This must
// cause the server session to be torn down.
client.write(head3.data, () => {
// This won't ever make it to the server
client.write(head4.data);
client.write(id1.data, () => {
// This Stream ID 5 frame will never make it to the server
client.write(id5.data);
});
});
});
Expand Down
65 changes: 65 additions & 0 deletions test/parallel/test-http2-pack-end-stream-flag.js
@@ -0,0 +1,65 @@
'use strict';

const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const assert = require('assert');
const http2 = require('http2');

const { PerformanceObserver } = require('perf_hooks');

const server = http2.createServer();

server.on('stream', (stream, headers) => {
stream.respond({
'content-type': 'text/html',
':status': 200
});
switch (headers[':path']) {
case '/singleEnd':
stream.end('OK');
// Backport v10.x: Manually pack END_STREAM flag
stream._final(() => {});
break;
case '/sequentialEnd':
stream.write('OK');
stream.end();
// Backport v10.x: Manually pack END_STREAM flag
stream._final(() => {});
break;
case '/delayedEnd':
stream.write('OK', () => stream.end());
break;
}
});

function testRequest(path, targetFrameCount, callback) {
const obs = new PerformanceObserver((list, observer) => {
const entry = list.getEntries()[0];
if (entry.name !== 'Http2Session') return;
if (entry.type !== 'client') return;
assert.strictEqual(entry.framesReceived, targetFrameCount);
observer.disconnect();
callback();
});
obs.observe({ entryTypes: ['http2'] });
const client = http2.connect(`http://localhost:${server.address().port}`, () => {
const req = client.request({ ':path': path });
req.resume();
req.end();
req.on('end', () => client.close());
});
}

// SETTINGS => SETTINGS => HEADERS => DATA
const MIN_FRAME_COUNT = 4;

server.listen(0, () => {
testRequest('/singleEnd', MIN_FRAME_COUNT, () => {
testRequest('/sequentialEnd', MIN_FRAME_COUNT, () => {
testRequest('/delayedEnd', MIN_FRAME_COUNT + 1, () => {
server.close();
});
});
});
});

0 comments on commit 2a3914d

Please sign in to comment.