Skip to content

Commit

Permalink
http2: fix h2-over-h2 connection proxying
Browse files Browse the repository at this point in the history
PR-URL: #52368
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Paolo Insogna <paolo@cowtech.it>
Reviewed-By: Marco Ippolito <marcoippolito54@gmail.com>
  • Loading branch information
pimterry authored and marco-ippolito committed May 3, 2024
1 parent f71a777 commit e010159
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 14 deletions.
19 changes: 10 additions & 9 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ const {
kUpdateTimer,
kHandle,
kSession,
kBoundSession,
setStreamTimeout,
} = require('internal/stream_base_commons');
const { kTimeout } = require('internal/timers');
Expand Down Expand Up @@ -1121,7 +1122,7 @@ function cleanupSession(session) {
if (handle)
handle.ondone = null;
if (socket) {
socket[kSession] = undefined;
socket[kBoundSession] = undefined;
socket[kServer] = undefined;
}
}
Expand Down Expand Up @@ -1235,10 +1236,10 @@ class Http2Session extends EventEmitter {
// If the session property already exists on the socket,
// then it has already been bound to an Http2Session instance
// and cannot be attached again.
if (socket[kSession] !== undefined)
if (socket[kBoundSession] !== undefined)
throw new ERR_HTTP2_SOCKET_BOUND();

socket[kSession] = this;
socket[kBoundSession] = this;

if (!socket._handle || !socket._handle.isStreamBase) {
socket = new JSStreamSocket(socket);
Expand Down Expand Up @@ -1617,7 +1618,7 @@ class Http2Session extends EventEmitter {
}

_onTimeout() {
callTimeout(this);
callTimeout(this, this);
}

ref() {
Expand Down Expand Up @@ -2093,7 +2094,7 @@ class Http2Stream extends Duplex {
}

_onTimeout() {
callTimeout(this, kSession);
callTimeout(this, this[kSession]);
}

// True if the HEADERS frame has been sent
Expand Down Expand Up @@ -2419,7 +2420,7 @@ class Http2Stream extends Duplex {
}
}

function callTimeout(self, kSession) {
function callTimeout(self, session) {
// If the session is destroyed, this should never actually be invoked,
// but just in case...
if (self.destroyed)
Expand All @@ -2430,7 +2431,7 @@ function callTimeout(self, kSession) {
// happens, meaning that if a write is ongoing it should never equal the
// newly fetched, updated value.
if (self[kState].writeQueueSize > 0) {
const handle = kSession ? self[kSession][kHandle] : self[kHandle];
const handle = session[kHandle];
const chunksSentSinceLastWrite = handle !== undefined ?
handle.chunksSentSinceLastWrite : null;
if (chunksSentSinceLastWrite !== null &&
Expand Down Expand Up @@ -3017,7 +3018,7 @@ ObjectDefineProperty(Http2Session.prototype, 'setTimeout', setTimeoutValue);
// When the socket emits an error, destroy the associated Http2Session and
// forward it the same error.
function socketOnError(error) {
const session = this[kSession];
const session = this[kBoundSession];
if (session !== undefined) {
// We can ignore ECONNRESET after GOAWAY was received as there's nothing
// we can do and the other side is fully within its rights to do so.
Expand Down Expand Up @@ -3300,7 +3301,7 @@ function setupCompat(ev) {
}

function socketOnClose() {
const session = this[kSession];
const session = this[kBoundSession];
if (session !== undefined) {
debugSessionObj(session, 'socket closed');
const err = session.connecting ? new ERR_SOCKET_CLOSED() : null;
Expand Down
10 changes: 5 additions & 5 deletions lib/internal/js_stream_socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ let debug = require('internal/util/debuglog').debuglog(
);
const { owner_symbol } = require('internal/async_hooks').symbols;
const { ERR_STREAM_WRAP } = require('internal/errors').codes;
const { kSession } = require('internal/stream_base_commons');
const { kBoundSession } = require('internal/stream_base_commons');

const kCurrentWriteRequest = Symbol('kCurrentWriteRequest');
const kCurrentShutdownRequest = Symbol('kCurrentShutdownRequest');
Expand Down Expand Up @@ -265,12 +265,12 @@ class JSStreamSocket extends Socket {
});
}

get [kSession]() {
return this.stream[kSession];
get [kBoundSession]() {
return this.stream[kBoundSession];
}

set [kSession](session) {
this.stream[kSession] = session;
set [kBoundSession](session) {
this.stream[kBoundSession] = session;
}
}

Expand Down
2 changes: 2 additions & 0 deletions lib/internal/stream_base_commons.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const kMaybeDestroy = Symbol('kMaybeDestroy');
const kUpdateTimer = Symbol('kUpdateTimer');
const kAfterAsyncWrite = Symbol('kAfterAsyncWrite');
const kHandle = Symbol('kHandle');
const kBoundSession = Symbol('kBoundSession');
const kSession = Symbol('kSession');

let debug = require('internal/util/debuglog').debuglog('stream', (fn) => {
Expand Down Expand Up @@ -255,6 +256,7 @@ function setStreamTimeout(msecs, callback) {
} else {
this[kTimeout] = setUnrefTimeout(this._onTimeout.bind(this), msecs);
if (this[kSession]) this[kSession][kUpdateTimer]();
if (this[kBoundSession]) this[kBoundSession][kUpdateTimer]();

if (callback !== undefined) {
validateFunction(callback, 'callback');
Expand Down
50 changes: 50 additions & 0 deletions test/parallel/test-http2-client-proxy-over-http2.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
'use strict';

const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const assert = require('assert');
const h2 = require('http2');

const server = h2.createServer();

server.listen(0, common.mustCall(function() {
const proxyClient = h2.connect(`http://localhost:${server.address().port}`);

const request = proxyClient.request({
':method': 'CONNECT',
':authority': 'example.com:80'
});

request.on('response', common.mustCall((connectResponse) => {
assert.strictEqual(connectResponse[':status'], 200);

const proxiedClient = h2.connect('http://example.com', {
createConnection: () => request // Tunnel via first request stream
});

const proxiedRequest = proxiedClient.request();
proxiedRequest.on('response', common.mustCall((proxiedResponse) => {
assert.strictEqual(proxiedResponse[':status'], 204);

proxiedClient.close();
proxyClient.close();
server.close();
}));
}));
}));

server.once('connect', common.mustCall((req, res) => {
assert.strictEqual(req.headers[':method'], 'CONNECT');
res.writeHead(200); // Accept the CONNECT tunnel

// Handle this stream as a new 'proxied' connection (pretend to forward
// but actually just unwrap the tunnel ourselves):
server.emit('connection', res.stream);
}));

// Handle the 'proxied' request itself:
server.once('request', common.mustCall((req, res) => {
res.writeHead(204);
res.end();
}));

0 comments on commit e010159

Please sign in to comment.