Skip to content

Commit

Permalink
quic: convert openStream to Promise
Browse files Browse the repository at this point in the history
Although most of the time openStream will be able to create the stream
immediately, when a stream is opened before the handshake is complete
we have to wait for the handshake to be complete before continuing.

PR-URL: #34351
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
  • Loading branch information
jasnell committed Jul 23, 2020
1 parent 69c78be commit 6e30fe7
Show file tree
Hide file tree
Showing 27 changed files with 334 additions and 461 deletions.
94 changes: 62 additions & 32 deletions lib/internal/quic/core.js
Expand Up @@ -19,6 +19,7 @@ const {
Promise,
PromiseAll,
PromiseReject,
PromiseResolve,
RegExp,
Set,
Symbol,
Expand Down Expand Up @@ -213,13 +214,13 @@ const kDestroy = Symbol('kDestroy');
const kEndpointBound = Symbol('kEndpointBound');
const kEndpointClose = Symbol('kEndpointClose');
const kHandshake = Symbol('kHandshake');
const kHandshakeComplete = Symbol('kHandshakeComplete');
const kHandshakePost = Symbol('kHandshakePost');
const kHeaders = Symbol('kHeaders');
const kInternalState = Symbol('kInternalState');
const kInternalClientState = Symbol('kInternalClientState');
const kInternalServerState = Symbol('kInternalServerState');
const kListen = Symbol('kListen');
const kMakeStream = Symbol('kMakeStream');
const kMaybeBind = Symbol('kMaybeBind');
const kOnFileOpened = Symbol('kOnFileOpened');
const kOnFileUnpipe = Symbol('kOnFileUnpipe');
Expand Down Expand Up @@ -1651,6 +1652,9 @@ class QuicSession extends EventEmitter {
destroyed: false,
earlyData: false,
handshakeComplete: false,
handshakeCompletePromise: undefined,
handshakeCompletePromiseResolve: undefined,
handshakeCompletePromiseReject: undefined,
idleTimeout: false,
maxPacketLength: NGTCP2_DEFAULT_MAX_PKTLEN,
servername: undefined,
Expand Down Expand Up @@ -1715,6 +1719,26 @@ class QuicSession extends EventEmitter {
});
}

[kHandshakeComplete]() {
const state = this[kInternalState];
if (state.handshakeComplete)
return PromiseResolve();

if (state.handshakeCompletePromise !== undefined)
return state.handshakeCompletePromise;

state.handshakeCompletePromise = new Promise((resolve, reject) => {
state.handshakeCompletePromiseResolve = resolve;
state.handshakeCompletePromiseReject = reject;
}).finally(() => {
state.handshakeCompletePromise = undefined;
state.handshakeCompletePromiseReject = undefined;
state.handshakeCompletePromiseResolve = undefined;
});

return state.handshakeCompletePromise;
}

// Sets the internal handle for the QuicSession instance. For
// server QuicSessions, this is called immediately as the
// handle is created before the QuicServerSession JS object.
Expand Down Expand Up @@ -1827,8 +1851,18 @@ class QuicSession extends EventEmitter {
state.verifyErrorReason = verifyErrorReason;
state.verifyErrorCode = verifyErrorCode;
state.earlyData = earlyData;
if (!this[kHandshakePost]())

if (!this[kHandshakePost]()) {
if (typeof state.handshakeCompletePromiseReject === 'function') {
// TODO(@jasnell): Proper error
state.handshakeCompletePromiseReject(
new ERR_OPERATION_FAILED('Handshake failed'));
}
return;
}

if (typeof state.handshakeCompletePromiseResolve === 'function')
state.handshakeCompletePromiseResolve();

process.nextTick(() => {
try {
Expand Down Expand Up @@ -1971,6 +2005,12 @@ class QuicSession extends EventEmitter {
} else if (typeof state.closePromiseResolve === 'function')
state.closePromiseResolve();

if (typeof state.handshakeCompletePromiseReject === 'function') {
// TODO(@jasnell): Proper error
state.handshakeCompletePromiseReject(
new ERR_OPERATION_FAILED('Handshake failed'));
}

process.nextTick(emit.bind(this, 'close'));
}

Expand Down Expand Up @@ -2113,8 +2153,7 @@ class QuicSession extends EventEmitter {
return this[kInternalState].statelessReset;
}

openStream(options) {
const state = this[kInternalState];
async openStream(options) {
if (this.destroyed) {
throw new ERR_INVALID_STATE(
`${this.constructor.name} is already destroyed`);
Expand All @@ -2123,51 +2162,42 @@ class QuicSession extends EventEmitter {
throw new ERR_INVALID_STATE(
`${this.constructor.name} is closing`);
}

const {
halfOpen, // Unidirectional or Bidirectional
highWaterMark,
defaultEncoding,
} = validateQuicStreamOptions(options);

const stream = new QuicStream({
highWaterMark,
defaultEncoding,
readable: !halfOpen
}, this);
await this[kHandshakeComplete]();

state.pendingStreams.add(stream);

// If early data is being used, we can create the internal QuicStream on the
// ready event, that is immediately after the internal QuicSession handle
// has been created. Otherwise, we have to wait until the secure event
// signaling the completion of the TLS handshake.
const makeStream = QuicSession[kMakeStream].bind(this, stream, halfOpen);
let deferred = false;
if (!this.handshakeComplete) {
deferred = true;
this.once('secure', makeStream);
if (this.destroyed) {
throw new ERR_INVALID_STATE(
`${this.constructor.name} is already destroyed`);
}
if (this.closing) {
throw new ERR_INVALID_STATE(
`${this.constructor.name} is closing`);
}

if (!deferred)
makeStream(stream, halfOpen);

return stream;
}

static [kMakeStream](stream, halfOpen) {
this[kInternalState].pendingStreams.delete(stream);
const handle =
halfOpen ?
_openUnidirectionalStream(this[kHandle]) :
_openBidirectionalStream(this[kHandle]);

if (handle === undefined) {
stream.destroy(new ERR_OPERATION_FAILED('Unable to create QuicStream'));
return;
}
if (handle === undefined)
throw new ERR_OPERATION_FAILED('Unable to create QuicStream');

const stream = new QuicStream({
highWaterMark,
defaultEncoding,
readable: !halfOpen
}, this);

stream[kSetHandle](handle);
this[kAddStream](stream.id, stream);

return stream;
}

get duration() {
Expand Down
8 changes: 3 additions & 5 deletions test/parallel/test-quic-client-connect-multiple-parallel.js
Expand Up @@ -44,11 +44,9 @@ async function connect(server, client) {
for (let i = 0; i < kCount; i++) {
const server = createQuicSocket({ server: options });

server.on('session', common.mustCall((session) => {
session.on('secure', common.mustCall(() => {
const stream = session.openStream({ halfOpen: true });
stream.end('Hi!');
}));
server.on('session', common.mustCall(async (session) => {
const stream = await session.openStream({ halfOpen: true });
stream.end('Hi!');
}));

server.on('close', common.mustCall());
Expand Down
8 changes: 3 additions & 5 deletions test/parallel/test-quic-client-connect-multiple-sequential.js
Expand Up @@ -45,11 +45,9 @@ async function connect(server, client) {
for (let i = 0; i < kCount; i++) {
const server = createQuicSocket({ server: options });

server.on('session', common.mustCall((session) => {
session.on('secure', common.mustCall(() => {
const stream = session.openStream({ halfOpen: true });
stream.end('Hi!');
}));
server.on('session', common.mustCall(async (session) => {
const stream = await session.openStream({ halfOpen: true });
stream.end('Hi!');
}));

server.on('close', common.mustCall());
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-quic-client-empty-preferred-address.js
Expand Up @@ -42,7 +42,7 @@ const options = { key, cert, ca, alpn: 'zzz' };
preferredAddressPolicy: 'accept',
});

const stream = clientSession.openStream();
const stream = await clientSession.openStream();
stream.end('hello');

await Promise.all([
Expand Down
10 changes: 5 additions & 5 deletions test/parallel/test-quic-client-server.js
Expand Up @@ -129,7 +129,7 @@ client.on('close', common.mustCall(onSocketClose.bind(client)));
});
}));

session.on('secure', common.mustCall((servername, alpn, cipher) => {
session.on('secure', common.mustCall(async (servername, alpn, cipher) => {
debug('QuicServerSession TLS Handshake Complete');
debug(' Server name: %s', servername);
debug(' ALPN: %s', alpn);
Expand All @@ -143,7 +143,7 @@ client.on('close', common.mustCall(onSocketClose.bind(client)));
assert(session.authenticated);
assert.strictEqual(session.authenticationError, undefined);

const uni = session.openStream({ halfOpen: true });
const uni = await session.openStream({ halfOpen: true });
assert(uni.unidirectional);
assert(!uni.bidirectional);
assert(uni.serverInitiated);
Expand Down Expand Up @@ -221,8 +221,8 @@ client.on('close', common.mustCall(onSocketClose.bind(client)));
name: 'Error'
};
assert.throws(() => session.ping(), err);
assert.throws(() => session.openStream(), err);
assert.throws(() => session.updateKey(), err);
assert.rejects(() => session.openStream(), err);
}));
}));

Expand Down Expand Up @@ -264,7 +264,7 @@ client.on('close', common.mustCall(onSocketClose.bind(client)));
debug(' Params: %s', params.toString('hex'));
}, 2));

req.on('secure', common.mustCall((servername, alpn, cipher) => {
req.on('secure', common.mustCall(async (servername, alpn, cipher) => {
debug('QuicClientSession TLS Handshake Complete');
debug(' Server name: %s', servername);
debug(' ALPN: %s', alpn);
Expand Down Expand Up @@ -308,7 +308,7 @@ client.on('close', common.mustCall(onSocketClose.bind(client)));
}

const file = fs.createReadStream(__filename);
const stream = req.openStream();
const stream = await req.openStream();
file.pipe(stream);
let data = '';
stream.resume();
Expand Down
59 changes: 31 additions & 28 deletions test/parallel/test-quic-errors-quicsession-openstream.js
Expand Up @@ -7,6 +7,7 @@ if (!common.hasQuic)
// Test errors thrown when openStream is called incorrectly
// or is not permitted

const { once } = require('events');
const { createHook } = require('async_hooks');
const assert = require('assert');
const { createQuicSocket } = require('net');
Expand All @@ -18,18 +19,12 @@ createHook({
}
}).enable();

const Countdown = require('../common/countdown');
const { key, cert, ca } = require('../common/quic');

const options = { key, cert, ca, alpn: 'zzz', maxStreamsUni: 0 };
const server = createQuicSocket({ server: options });
const client = createQuicSocket({ client: options });

const countdown = new Countdown(1, () => {
server.close();
client.close();
});

server.on('close', common.mustCall());
client.on('close', common.mustCall());

Expand All @@ -44,40 +39,48 @@ client.on('close', common.mustCall());
port: server.endpoints[0].address.port
});

['z', 1, {}, [], null, Infinity, 1n].forEach((i) => {
assert.throws(
() => req.openStream({ halfOpen: i }),
{ code: 'ERR_INVALID_ARG_TYPE' }
);
});
for (const halfOpen of ['z', 1, {}, [], null, Infinity, 1n]) {
await assert.rejects(req.openStream({ halfOpen }), {
code: 'ERR_INVALID_ARG_TYPE'
});
}

['', 1n, {}, [], false, 'zebra'].forEach((defaultEncoding) => {
assert.throws(() => req.openStream({ defaultEncoding }), {
for (const defaultEncoding of ['', 1n, {}, [], false, 'zebra']) {
await assert.rejects(req.openStream({ defaultEncoding }), {
code: 'ERR_INVALID_ARG_VALUE'
});
});
}

[-1, Number.MAX_SAFE_INTEGER + 1].forEach((highWaterMark) => {
assert.throws(() => req.openStream({ highWaterMark }), {
for (const highWaterMark of [-1, Number.MAX_SAFE_INTEGER + 1]) {
await assert.rejects(req.openStream({ highWaterMark }), {
code: 'ERR_OUT_OF_RANGE'
});
});
}

['a', 1n, [], {}, false].forEach((highWaterMark) => {
assert.throws(() => req.openStream({ highWaterMark }), {
for (const highWaterMark of ['a', 1n, [], {}, false]) {
await assert.rejects(req.openStream({ highWaterMark }), {
code: 'ERR_INVALID_ARG_TYPE'
});
});
}

// Unidirectional streams are not allowed. openStream will succeeed
// but the stream will be destroyed immediately. The underlying
// QuicStream C++ handle will not be created.
req.openStream({
halfOpen: true,
highWaterMark: 10,
defaultEncoding: 'utf16le'
}).on('error', common.expectsError({
code: 'ERR_OPERATION_FAILED'
})).on('error', common.mustCall(() => countdown.dec()));
await assert.rejects(
req.openStream({
halfOpen: true,
highWaterMark: 10,
defaultEncoding: 'utf16le'
}), {
code: 'ERR_OPERATION_FAILED'
});

server.close();
client.close();

await Promise.all([
once(server, 'close'),
once(client, 'close')
]);

})().then(common.mustCall());

0 comments on commit 6e30fe7

Please sign in to comment.