Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http2: reinject data received before http2 is attached #35678

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 13 additions & 2 deletions lib/internal/http2/core.js
Expand Up @@ -1037,7 +1037,7 @@ function finishSessionClose(session, error) {
if (socket && !socket.destroyed) {
// Always wait for writable side to finish.
socket.end((err) => {
debugSessionObj(session, 'finishSessionClose socket end', err);
debugSessionObj(session, 'finishSessionClose socket end', err, error);
// Due to the way the underlying stream is handled in Http2Session we
// won't get graceful Readable end from the other side even if it was sent
// as the stream is already considered closed and will neither be read
Expand All @@ -1055,7 +1055,7 @@ function finishSessionClose(session, error) {
}

function closeSession(session, code, error) {
debugSessionObj(session, 'start closing/destroying');
debugSessionObj(session, 'start closing/destroying', error);

const state = session[kState];
state.flags |= SESSION_FLAGS_DESTROYED;
Expand Down Expand Up @@ -3128,6 +3128,17 @@ function connect(authority, options, listener) {

if (typeof listener === 'function')
session.once('connect', listener);

debug('Http2Session connect', options.createConnection);
// Socket already has some buffered data - emulate receiving it
// https://github.com/nodejs/node/issues/35475
if (typeof options.createConnection === 'function') {
mmomtchev marked this conversation as resolved.
Show resolved Hide resolved
let buf;
while ((buf = socket.read()) !== null) {
debug(`Http2Session connect: injecting ${buf.length} already in buffer`);
session[kHandle].receive(buf);
}
}
return session;
}

Expand Down
28 changes: 28 additions & 0 deletions src/node_http2.cc
Expand Up @@ -1829,6 +1829,33 @@ void Http2Session::Consume(Local<Object> stream_obj) {
Debug(this, "i/o stream consumed");
}

// Allow injecting of data from JS
// This is used when the socket has already some data received
// before our listener was attached
// https://github.com/nodejs/node/issues/35475
void Http2Session::Receive(const FunctionCallbackInfo<Value>& args) {
Http2Session* session;
ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder());
CHECK(args[0]->IsObject());

ArrayBufferViewContents<char> buffer(args[0]);
const char* data = buffer.data();
size_t len = buffer.length();
Debug(session, "Receiving %zu bytes injected from JS", len);

// Copy given buffer
while (len > 0) {
uv_buf_t buf = session->OnStreamAlloc(len);
size_t copy = buf.len > len ? len : buf.len;
memcpy(buf.base, data, copy);
buf.len = copy;
session->OnStreamRead(copy, buf);

data += copy;
len -= copy;
}
}

Http2Stream* Http2Stream::New(Http2Session* session,
int32_t id,
nghttp2_headers_category category,
Expand Down Expand Up @@ -3054,6 +3081,7 @@ void Initialize(Local<Object> target,
env->SetProtoMethod(session, "altsvc", Http2Session::AltSvc);
env->SetProtoMethod(session, "ping", Http2Session::Ping);
env->SetProtoMethod(session, "consume", Http2Session::Consume);
env->SetProtoMethod(session, "receive", Http2Session::Receive);
env->SetProtoMethod(session, "destroy", Http2Session::Destroy);
env->SetProtoMethod(session, "goaway", Http2Session::Goaway);
env->SetProtoMethod(session, "settings", Http2Session::Settings);
Expand Down
1 change: 1 addition & 0 deletions src/node_http2.h
Expand Up @@ -694,6 +694,7 @@ class Http2Session : public AsyncWrap,
// The JavaScript API
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Consume(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Receive(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Destroy(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Settings(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Request(const v8::FunctionCallbackInfo<v8::Value>& args);
Expand Down
64 changes: 64 additions & 0 deletions test/parallel/test-http2-connect-tls-with-delay.js
@@ -0,0 +1,64 @@
// Flags: --expose-internals
'use strict';
const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');

if (!common.hasMultiLocalhost())
common.skip('platform-specific test.');

const http2 = require('http2');
const assert = require('assert');
const tls = require('tls');
const fixtures = require('../common/fixtures');

const serverOptions = {
key: fixtures.readKey('agent1-key.pem'),
cert: fixtures.readKey('agent1-cert.pem')
};
const server = http2.createSecureServer(serverOptions, (req, res) => {
console.log(`Connect from: ${req.connection.remoteAddress}`);
assert.strictEqual(req.connection.remoteAddress, '127.0.0.2');

req.on('end', common.mustCall(() => {
res.writeHead(200, { 'Content-Type': 'text/plain' });
res.end(`You are from: ${req.connection.remoteAddress}`);
}));
req.resume();
});

server.listen(0, '127.0.0.1', common.mustCall(() => {
const options = {
ALPNProtocols: ['h2'],
host: '127.0.0.1',
servername: 'localhost',
localAddress: '127.0.0.2',
port: server.address().port,
rejectUnauthorized: false
};

console.log('Server ready', server.address().port);

const socket = tls.connect(options, async () => {

console.log('TLS Connected!');

setTimeout(() => {

const client = http2.connect(
'https://localhost:' + server.address().port,
{ ...options, createConnection: () => socket }
);
const req = client.request({
':path': '/'
});
req.on('data', () => req.resume());
req.on('end', common.mustCall(function() {
client.close();
req.close();
server.close();
}));
req.end();
}, 1000);
});
}));