From 6e30fe7a7f9ae15f78827884e330104d6e2666a3 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Mon, 13 Jul 2020 19:50:41 -0700 Subject: [PATCH] quic: convert openStream to Promise Although most of the time openStream will be able to create the stream immediately, when a stream is opened before the handshake is complete we have to wait for the handshake to be complete before continuing. PR-URL: https://github.com/nodejs/node/pull/34351 Reviewed-By: Robert Nagy Reviewed-By: Anna Henningsen --- lib/internal/quic/core.js | 94 ++++++++++++------- ...t-quic-client-connect-multiple-parallel.js | 8 +- ...quic-client-connect-multiple-sequential.js | 8 +- ...est-quic-client-empty-preferred-address.js | 2 +- test/parallel/test-quic-client-server.js | 10 +- ...test-quic-errors-quicsession-openstream.js | 59 ++++++------ .../parallel/test-quic-http3-client-server.js | 68 +++++++------- test/parallel/test-quic-http3-push.js | 24 +++-- test/parallel/test-quic-http3-trailers.js | 26 +++-- test/parallel/test-quic-ipv6only.js | 4 +- test/parallel/test-quic-process-cleanup.js | 16 ++-- test/parallel/test-quic-qlog.js | 4 +- ...est-quic-quicsession-openstream-pending.js | 62 ------------ test/parallel/test-quic-quicsession-resume.js | 42 +++------ .../parallel/test-quic-quicsession-send-fd.js | 42 ++++----- ...quicsession-send-file-close-before-open.js | 16 ++-- ...uicsession-send-file-open-error-handled.js | 24 +++-- ...t-quic-quicsession-send-file-open-error.js | 20 ++-- ...c-quicsession-server-openstream-pending.js | 57 ----------- ...st-quic-quicsocket-packetloss-stream-rx.js | 2 +- ...st-quic-quicsocket-packetloss-stream-tx.js | 2 +- .../test-quic-quicstream-close-early.js | 44 ++++----- test/parallel/test-quic-quicstream-destroy.js | 2 +- .../test-quic-quicstream-identifiers.js | 87 +++++++++-------- .../test-quic-simple-client-migrate.js | 45 +++++---- test/parallel/test-quic-statelessreset.js | 11 ++- test/parallel/test-quic-with-fake-udp.js | 16 ++-- 27 files changed, 334 insertions(+), 461 deletions(-) delete mode 100644 test/parallel/test-quic-quicsession-openstream-pending.js delete mode 100644 test/parallel/test-quic-quicsession-server-openstream-pending.js diff --git a/lib/internal/quic/core.js b/lib/internal/quic/core.js index 6bc784ad0dbeaf..fd62205f212101 100644 --- a/lib/internal/quic/core.js +++ b/lib/internal/quic/core.js @@ -19,6 +19,7 @@ const { Promise, PromiseAll, PromiseReject, + PromiseResolve, RegExp, Set, Symbol, @@ -213,13 +214,13 @@ const kDestroy = Symbol('kDestroy'); const kEndpointBound = Symbol('kEndpointBound'); const kEndpointClose = Symbol('kEndpointClose'); const kHandshake = Symbol('kHandshake'); +const kHandshakeComplete = Symbol('kHandshakeComplete'); const kHandshakePost = Symbol('kHandshakePost'); const kHeaders = Symbol('kHeaders'); const kInternalState = Symbol('kInternalState'); const kInternalClientState = Symbol('kInternalClientState'); const kInternalServerState = Symbol('kInternalServerState'); const kListen = Symbol('kListen'); -const kMakeStream = Symbol('kMakeStream'); const kMaybeBind = Symbol('kMaybeBind'); const kOnFileOpened = Symbol('kOnFileOpened'); const kOnFileUnpipe = Symbol('kOnFileUnpipe'); @@ -1651,6 +1652,9 @@ class QuicSession extends EventEmitter { destroyed: false, earlyData: false, handshakeComplete: false, + handshakeCompletePromise: undefined, + handshakeCompletePromiseResolve: undefined, + handshakeCompletePromiseReject: undefined, idleTimeout: false, maxPacketLength: NGTCP2_DEFAULT_MAX_PKTLEN, servername: undefined, @@ -1715,6 +1719,26 @@ class QuicSession extends EventEmitter { }); } + [kHandshakeComplete]() { + const state = this[kInternalState]; + if (state.handshakeComplete) + return PromiseResolve(); + + if (state.handshakeCompletePromise !== undefined) + return state.handshakeCompletePromise; + + state.handshakeCompletePromise = new Promise((resolve, reject) => { + state.handshakeCompletePromiseResolve = resolve; + state.handshakeCompletePromiseReject = reject; + }).finally(() => { + state.handshakeCompletePromise = undefined; + state.handshakeCompletePromiseReject = undefined; + state.handshakeCompletePromiseResolve = undefined; + }); + + return state.handshakeCompletePromise; + } + // Sets the internal handle for the QuicSession instance. For // server QuicSessions, this is called immediately as the // handle is created before the QuicServerSession JS object. @@ -1827,8 +1851,18 @@ class QuicSession extends EventEmitter { state.verifyErrorReason = verifyErrorReason; state.verifyErrorCode = verifyErrorCode; state.earlyData = earlyData; - if (!this[kHandshakePost]()) + + if (!this[kHandshakePost]()) { + if (typeof state.handshakeCompletePromiseReject === 'function') { + // TODO(@jasnell): Proper error + state.handshakeCompletePromiseReject( + new ERR_OPERATION_FAILED('Handshake failed')); + } return; + } + + if (typeof state.handshakeCompletePromiseResolve === 'function') + state.handshakeCompletePromiseResolve(); process.nextTick(() => { try { @@ -1971,6 +2005,12 @@ class QuicSession extends EventEmitter { } else if (typeof state.closePromiseResolve === 'function') state.closePromiseResolve(); + if (typeof state.handshakeCompletePromiseReject === 'function') { + // TODO(@jasnell): Proper error + state.handshakeCompletePromiseReject( + new ERR_OPERATION_FAILED('Handshake failed')); + } + process.nextTick(emit.bind(this, 'close')); } @@ -2113,8 +2153,7 @@ class QuicSession extends EventEmitter { return this[kInternalState].statelessReset; } - openStream(options) { - const state = this[kInternalState]; + async openStream(options) { if (this.destroyed) { throw new ERR_INVALID_STATE( `${this.constructor.name} is already destroyed`); @@ -2123,51 +2162,42 @@ class QuicSession extends EventEmitter { throw new ERR_INVALID_STATE( `${this.constructor.name} is closing`); } + const { halfOpen, // Unidirectional or Bidirectional highWaterMark, defaultEncoding, } = validateQuicStreamOptions(options); - const stream = new QuicStream({ - highWaterMark, - defaultEncoding, - readable: !halfOpen - }, this); + await this[kHandshakeComplete](); - state.pendingStreams.add(stream); - - // If early data is being used, we can create the internal QuicStream on the - // ready event, that is immediately after the internal QuicSession handle - // has been created. Otherwise, we have to wait until the secure event - // signaling the completion of the TLS handshake. - const makeStream = QuicSession[kMakeStream].bind(this, stream, halfOpen); - let deferred = false; - if (!this.handshakeComplete) { - deferred = true; - this.once('secure', makeStream); + if (this.destroyed) { + throw new ERR_INVALID_STATE( + `${this.constructor.name} is already destroyed`); + } + if (this.closing) { + throw new ERR_INVALID_STATE( + `${this.constructor.name} is closing`); } - if (!deferred) - makeStream(stream, halfOpen); - - return stream; - } - - static [kMakeStream](stream, halfOpen) { - this[kInternalState].pendingStreams.delete(stream); const handle = halfOpen ? _openUnidirectionalStream(this[kHandle]) : _openBidirectionalStream(this[kHandle]); - if (handle === undefined) { - stream.destroy(new ERR_OPERATION_FAILED('Unable to create QuicStream')); - return; - } + if (handle === undefined) + throw new ERR_OPERATION_FAILED('Unable to create QuicStream'); + + const stream = new QuicStream({ + highWaterMark, + defaultEncoding, + readable: !halfOpen + }, this); stream[kSetHandle](handle); this[kAddStream](stream.id, stream); + + return stream; } get duration() { diff --git a/test/parallel/test-quic-client-connect-multiple-parallel.js b/test/parallel/test-quic-client-connect-multiple-parallel.js index 06b202204d909a..280524df65df7c 100644 --- a/test/parallel/test-quic-client-connect-multiple-parallel.js +++ b/test/parallel/test-quic-client-connect-multiple-parallel.js @@ -44,11 +44,9 @@ async function connect(server, client) { for (let i = 0; i < kCount; i++) { const server = createQuicSocket({ server: options }); - server.on('session', common.mustCall((session) => { - session.on('secure', common.mustCall(() => { - const stream = session.openStream({ halfOpen: true }); - stream.end('Hi!'); - })); + server.on('session', common.mustCall(async (session) => { + const stream = await session.openStream({ halfOpen: true }); + stream.end('Hi!'); })); server.on('close', common.mustCall()); diff --git a/test/parallel/test-quic-client-connect-multiple-sequential.js b/test/parallel/test-quic-client-connect-multiple-sequential.js index 8832045a10d3f0..5b1150d1cb1668 100644 --- a/test/parallel/test-quic-client-connect-multiple-sequential.js +++ b/test/parallel/test-quic-client-connect-multiple-sequential.js @@ -45,11 +45,9 @@ async function connect(server, client) { for (let i = 0; i < kCount; i++) { const server = createQuicSocket({ server: options }); - server.on('session', common.mustCall((session) => { - session.on('secure', common.mustCall(() => { - const stream = session.openStream({ halfOpen: true }); - stream.end('Hi!'); - })); + server.on('session', common.mustCall(async (session) => { + const stream = await session.openStream({ halfOpen: true }); + stream.end('Hi!'); })); server.on('close', common.mustCall()); diff --git a/test/parallel/test-quic-client-empty-preferred-address.js b/test/parallel/test-quic-client-empty-preferred-address.js index 0d55ebe1163f7c..597b312ae8c213 100644 --- a/test/parallel/test-quic-client-empty-preferred-address.js +++ b/test/parallel/test-quic-client-empty-preferred-address.js @@ -42,7 +42,7 @@ const options = { key, cert, ca, alpn: 'zzz' }; preferredAddressPolicy: 'accept', }); - const stream = clientSession.openStream(); + const stream = await clientSession.openStream(); stream.end('hello'); await Promise.all([ diff --git a/test/parallel/test-quic-client-server.js b/test/parallel/test-quic-client-server.js index c112539b1fa4e4..d9b477b7eecb94 100644 --- a/test/parallel/test-quic-client-server.js +++ b/test/parallel/test-quic-client-server.js @@ -129,7 +129,7 @@ client.on('close', common.mustCall(onSocketClose.bind(client))); }); })); - session.on('secure', common.mustCall((servername, alpn, cipher) => { + session.on('secure', common.mustCall(async (servername, alpn, cipher) => { debug('QuicServerSession TLS Handshake Complete'); debug(' Server name: %s', servername); debug(' ALPN: %s', alpn); @@ -143,7 +143,7 @@ client.on('close', common.mustCall(onSocketClose.bind(client))); assert(session.authenticated); assert.strictEqual(session.authenticationError, undefined); - const uni = session.openStream({ halfOpen: true }); + const uni = await session.openStream({ halfOpen: true }); assert(uni.unidirectional); assert(!uni.bidirectional); assert(uni.serverInitiated); @@ -221,8 +221,8 @@ client.on('close', common.mustCall(onSocketClose.bind(client))); name: 'Error' }; assert.throws(() => session.ping(), err); - assert.throws(() => session.openStream(), err); assert.throws(() => session.updateKey(), err); + assert.rejects(() => session.openStream(), err); })); })); @@ -264,7 +264,7 @@ client.on('close', common.mustCall(onSocketClose.bind(client))); debug(' Params: %s', params.toString('hex')); }, 2)); - req.on('secure', common.mustCall((servername, alpn, cipher) => { + req.on('secure', common.mustCall(async (servername, alpn, cipher) => { debug('QuicClientSession TLS Handshake Complete'); debug(' Server name: %s', servername); debug(' ALPN: %s', alpn); @@ -308,7 +308,7 @@ client.on('close', common.mustCall(onSocketClose.bind(client))); } const file = fs.createReadStream(__filename); - const stream = req.openStream(); + const stream = await req.openStream(); file.pipe(stream); let data = ''; stream.resume(); diff --git a/test/parallel/test-quic-errors-quicsession-openstream.js b/test/parallel/test-quic-errors-quicsession-openstream.js index 879d92cecc2ca1..d608f12d20a2e3 100644 --- a/test/parallel/test-quic-errors-quicsession-openstream.js +++ b/test/parallel/test-quic-errors-quicsession-openstream.js @@ -7,6 +7,7 @@ if (!common.hasQuic) // Test errors thrown when openStream is called incorrectly // or is not permitted +const { once } = require('events'); const { createHook } = require('async_hooks'); const assert = require('assert'); const { createQuicSocket } = require('net'); @@ -18,18 +19,12 @@ createHook({ } }).enable(); -const Countdown = require('../common/countdown'); const { key, cert, ca } = require('../common/quic'); const options = { key, cert, ca, alpn: 'zzz', maxStreamsUni: 0 }; const server = createQuicSocket({ server: options }); const client = createQuicSocket({ client: options }); -const countdown = new Countdown(1, () => { - server.close(); - client.close(); -}); - server.on('close', common.mustCall()); client.on('close', common.mustCall()); @@ -44,40 +39,48 @@ client.on('close', common.mustCall()); port: server.endpoints[0].address.port }); - ['z', 1, {}, [], null, Infinity, 1n].forEach((i) => { - assert.throws( - () => req.openStream({ halfOpen: i }), - { code: 'ERR_INVALID_ARG_TYPE' } - ); - }); + for (const halfOpen of ['z', 1, {}, [], null, Infinity, 1n]) { + await assert.rejects(req.openStream({ halfOpen }), { + code: 'ERR_INVALID_ARG_TYPE' + }); + } - ['', 1n, {}, [], false, 'zebra'].forEach((defaultEncoding) => { - assert.throws(() => req.openStream({ defaultEncoding }), { + for (const defaultEncoding of ['', 1n, {}, [], false, 'zebra']) { + await assert.rejects(req.openStream({ defaultEncoding }), { code: 'ERR_INVALID_ARG_VALUE' }); - }); + } - [-1, Number.MAX_SAFE_INTEGER + 1].forEach((highWaterMark) => { - assert.throws(() => req.openStream({ highWaterMark }), { + for (const highWaterMark of [-1, Number.MAX_SAFE_INTEGER + 1]) { + await assert.rejects(req.openStream({ highWaterMark }), { code: 'ERR_OUT_OF_RANGE' }); - }); + } - ['a', 1n, [], {}, false].forEach((highWaterMark) => { - assert.throws(() => req.openStream({ highWaterMark }), { + for (const highWaterMark of ['a', 1n, [], {}, false]) { + await assert.rejects(req.openStream({ highWaterMark }), { code: 'ERR_INVALID_ARG_TYPE' }); - }); + } // Unidirectional streams are not allowed. openStream will succeeed // but the stream will be destroyed immediately. The underlying // QuicStream C++ handle will not be created. - req.openStream({ - halfOpen: true, - highWaterMark: 10, - defaultEncoding: 'utf16le' - }).on('error', common.expectsError({ - code: 'ERR_OPERATION_FAILED' - })).on('error', common.mustCall(() => countdown.dec())); + await assert.rejects( + req.openStream({ + halfOpen: true, + highWaterMark: 10, + defaultEncoding: 'utf16le' + }), { + code: 'ERR_OPERATION_FAILED' + }); + + server.close(); + client.close(); + + await Promise.all([ + once(server, 'close'), + once(client, 'close') + ]); })().then(common.mustCall()); diff --git a/test/parallel/test-quic-http3-client-server.js b/test/parallel/test-quic-http3-client-server.js index accff790037c48..2367465947af99 100644 --- a/test/parallel/test-quic-http3-client-server.js +++ b/test/parallel/test-quic-http3-client-server.js @@ -113,41 +113,39 @@ const countdown = new Countdown(1, () => { req.on('close', common.mustCall()); const file = fs.createReadStream(__filename); - const stream = req.openStream(); - - stream.on('ready', common.mustCall(() => { - assert(stream.submitInitialHeaders({ - ':method': 'POST', - ':scheme': 'https', - ':authority': 'localhost', - ':path': '/', - })); - file.pipe(stream); - let data = ''; - stream.resume(); - stream.setEncoding('utf8'); - - stream.on('initialHeaders', common.mustCall((headers) => { - const expected = [ - [ ':status', '200' ] - ]; - assert.deepStrictEqual(expected, headers); - debug('Received expected response headers'); - })); - stream.on('informationalHeaders', common.mustNotCall()); - stream.on('trailingHeaders', common.mustNotCall()); - - stream.on('data', (chunk) => data += chunk); - stream.on('finish', common.mustCall()); - stream.on('end', common.mustCall(() => { - assert.strictEqual(data, filedata); - debug('Client received expected data for stream %d', stream.id); - })); - stream.on('close', common.mustCall(() => { - debug('Bidirectional, Client-initiated stream %d closed', stream.id); - countdown.dec(); - })); - debug('Bidirectional, Client-initiated stream %d opened', stream.id); + const stream = await req.openStream(); + + assert(stream.submitInitialHeaders({ + ':method': 'POST', + ':scheme': 'https', + ':authority': 'localhost', + ':path': '/', + })); + file.pipe(stream); + let data = ''; + stream.resume(); + stream.setEncoding('utf8'); + + stream.on('initialHeaders', common.mustCall((headers) => { + const expected = [ + [ ':status', '200' ] + ]; + assert.deepStrictEqual(expected, headers); + debug('Received expected response headers'); + })); + stream.on('informationalHeaders', common.mustNotCall()); + stream.on('trailingHeaders', common.mustNotCall()); + + stream.on('data', (chunk) => data += chunk); + stream.on('finish', common.mustCall()); + stream.on('end', common.mustCall(() => { + assert.strictEqual(data, filedata); + debug('Client received expected data for stream %d', stream.id); + })); + stream.on('close', common.mustCall(() => { + debug('Bidirectional, Client-initiated stream %d closed', stream.id); + countdown.dec(); })); + debug('Bidirectional, Client-initiated stream %d opened', stream.id); })().then(common.mustCall()); diff --git a/test/parallel/test-quic-http3-push.js b/test/parallel/test-quic-http3-push.js index 47f96cde6aeefa..77c4b164b0e57f 100644 --- a/test/parallel/test-quic-http3-push.js +++ b/test/parallel/test-quic-http3-push.js @@ -113,7 +113,7 @@ const countdown = new Countdown(2, () => { req.on('close', common.mustCall()); - const stream = req.openStream(); + const stream = await req.openStream(); stream.on('pushHeaders', common.mustCall((headers, push_id) => { const expected = [ @@ -139,18 +139,16 @@ const countdown = new Countdown(2, () => { countdown.dec(); })); - stream.on('ready', () => { - assert(stream.submitInitialHeaders({ - ':method': 'POST', - ':scheme': 'https', - ':authority': 'localhost', - ':path': '/', - })); + assert(stream.submitInitialHeaders({ + ':method': 'POST', + ':scheme': 'https', + ':authority': 'localhost', + ':path': '/', + })); - stream.end('hello world'); - stream.resume(); - stream.on('finish', common.mustCall()); - stream.on('end', common.mustCall()); - }); + stream.end('hello world'); + stream.resume(); + stream.on('finish', common.mustCall()); + stream.on('end', common.mustCall()); })().then(common.mustCall()); diff --git a/test/parallel/test-quic-http3-trailers.js b/test/parallel/test-quic-http3-trailers.js index 2c65ff7a8a8743..176eacf8aaf586 100644 --- a/test/parallel/test-quic-http3-trailers.js +++ b/test/parallel/test-quic-http3-trailers.js @@ -70,28 +70,26 @@ const countdown = new Countdown(1, () => { req.on('close', common.mustCall()); - const stream = req.openStream(); + const stream = await req.openStream(); stream.on('trailingHeaders', common.mustCall((headers) => { const expected = [ [ 'a', '1' ] ]; assert.deepStrictEqual(expected, headers); })); - stream.on('ready', common.mustCall(() => { - assert(stream.submitInitialHeaders({ - ':method': 'POST', - ':scheme': 'https', - ':authority': 'localhost', - ':path': '/', - })); - - stream.submitTrailingHeaders({ 'b': 2 }); - stream.end('hello world'); - stream.resume(); - stream.on('finish', common.mustCall()); - stream.on('end', common.mustCall()); + assert(stream.submitInitialHeaders({ + ':method': 'POST', + ':scheme': 'https', + ':authority': 'localhost', + ':path': '/', })); + stream.submitTrailingHeaders({ 'b': 2 }); + stream.end('hello world'); + stream.resume(); + stream.on('finish', common.mustCall()); + stream.on('end', common.mustCall()); + stream.on('initialHeaders', common.mustCall((headers) => { const expected = [ [ ':status', '200' ] diff --git a/test/parallel/test-quic-ipv6only.js b/test/parallel/test-quic-ipv6only.js index bfd8fe1de73448..40b027e431dba1 100644 --- a/test/parallel/test-quic-ipv6only.js +++ b/test/parallel/test-quic-ipv6only.js @@ -40,7 +40,7 @@ async function ipv6() { port: server.endpoints[0].address.port }); - const stream = session.openStream({ halfOpen: true }); + const stream = await session.openStream({ halfOpen: true }); stream.end('hello'); await once(stream, 'close'); @@ -99,7 +99,7 @@ async function mismatch() { type: 'udp6', idleTimeout: common.platformTimeout(1), }), { - code: 'ERR_OPERATION_FAILED' + code: 'ERR_QUIC_FAILED_TO_CREATE_SESSION' }); client.close(); diff --git a/test/parallel/test-quic-process-cleanup.js b/test/parallel/test-quic-process-cleanup.js index 10cf321de4a1a0..2f057a52a9e9c3 100644 --- a/test/parallel/test-quic-process-cleanup.js +++ b/test/parallel/test-quic-process-cleanup.js @@ -26,15 +26,13 @@ server.on('close', common.mustNotCall()); client.on('close', common.mustNotCall()); (async function() { - server.on('session', common.mustCall((session) => { - session.on('secure', common.mustCall((servername, alpn, cipher) => { - const stream = session.openStream({ halfOpen: false }); - stream.write('Hi!'); - stream.on('data', common.mustNotCall()); - stream.on('finish', common.mustNotCall()); - stream.on('close', common.mustNotCall()); - stream.on('end', common.mustNotCall()); - })); + server.on('session', common.mustCall(async (session) => { + const stream = await session.openStream({ halfOpen: false }); + stream.write('Hi!'); + stream.on('data', common.mustNotCall()); + stream.on('finish', common.mustNotCall()); + stream.on('close', common.mustNotCall()); + stream.on('end', common.mustNotCall()); session.on('close', common.mustNotCall()); })); diff --git a/test/parallel/test-quic-qlog.js b/test/parallel/test-quic-qlog.js index debb1ea636f1dc..3d212346e118b7 100644 --- a/test/parallel/test-quic-qlog.js +++ b/test/parallel/test-quic-qlog.js @@ -30,9 +30,9 @@ const client = createQuicSocket({ clientSide.afterBind(); (async function() { - server.on('session', common.mustCall((session) => { + server.on('session', common.mustCall(async (session) => { gatherQlog(session, 'server'); - session.openStream({ halfOpen: true }).end('Hi!'); + (await session.openStream({ halfOpen: true })).end('Hi!'); })); await server.listen(); diff --git a/test/parallel/test-quic-quicsession-openstream-pending.js b/test/parallel/test-quic-quicsession-openstream-pending.js deleted file mode 100644 index a3fefd28644824..00000000000000 --- a/test/parallel/test-quic-quicsession-openstream-pending.js +++ /dev/null @@ -1,62 +0,0 @@ -// Flags: --no-warnings -'use strict'; -const common = require('../common'); -if (!common.hasQuic) - common.skip('missing quic'); - -// Test that opening a stream works even if the session isn’t ready yet. - -const assert = require('assert'); -const { createQuicSocket } = require('net'); -const { key, cert, ca } = require('../common/quic'); -const { once } = require('events'); -const options = { key, cert, ca, alpn: 'meow' }; - -const server = createQuicSocket({ server: options }); -const client = createQuicSocket({ client: options }); - -(async () => { - server.on('session', common.mustCall((session) => { - session.on('stream', common.mustCall(async (stream) => { - let data = ''; - stream.setEncoding('utf8'); - stream.on('data', (chunk) => data += chunk); - await once(stream, 'end'); - assert.strictEqual(data, 'Hello!'); - })); - })); - - await server.listen(); - - const req = await client.connect({ - address: common.localhostIPv4, - port: server.endpoints[0].address.port - }); - - // In this case, the QuicStream is usable but corked - // until the underlying internal QuicStream handle - // has been created, which will not happen until - // after the TLS handshake has been completed. - const stream = req.openStream({ halfOpen: true }); - stream.end('Hello!'); - stream.on('error', common.mustNotCall()); - stream.resume(); - assert(!req.allowEarlyData); - assert(!req.handshakeComplete); - assert(stream.pending); - - await once(stream, 'ready'); - - assert(req.handshakeComplete); - assert(!stream.pending); - - await once(stream, 'close'); - - server.close(); - client.close(); - - await Promise.all([ - once(server, 'close'), - once(client, 'close') - ]); -})().then(common.mustCall()); diff --git a/test/parallel/test-quic-quicsession-resume.js b/test/parallel/test-quic-quicsession-resume.js index 75c42316750589..85e48f48b10b2c 100644 --- a/test/parallel/test-quic-quicsession-resume.js +++ b/test/parallel/test-quic-quicsession-resume.js @@ -59,17 +59,15 @@ const countdown = new Countdown(2, () => { storedParams = params; }, 1)); - req.on('secure', () => { - const stream = req.openStream({ halfOpen: true }); - stream.end('hello'); - stream.resume(); - stream.on('close', () => { - req.close(); - countdown.dec(); - // Wait a turn then start a new session using the stored - // ticket and transportParameters - setImmediate(newSession, storedTicket, storedParams); - }); + const stream = await req.openStream({ halfOpen: true }); + stream.end('hello'); + stream.resume(); + stream.on('close', () => { + req.close(); + countdown.dec(); + // Wait a turn then start a new session using the stored + // ticket and transportParameters + setImmediate(newSession, storedTicket, storedParams); }); async function newSession(sessionTicket, remoteTransportParams) { @@ -82,26 +80,16 @@ const countdown = new Countdown(2, () => { assert(req.allowEarlyData); - const stream = req.openStream({ halfOpen: true }); + const stream = await req.openStream({ halfOpen: true }); stream.end('hello'); stream.on('error', common.mustNotCall()); stream.on('close', common.mustCall(() => countdown.dec())); - // TODO(@jasnell): There's a slight bug in here in that - // calling end() will uncork the stream, causing data to - // be flushed to the C++ layer, which will trigger a - // SendPendingData that will start the handshake. That - // has the effect of short circuiting the intent of - // manual startHandshake(), which makes it not use 0RTT - // for the stream data. - - req.on('secure', common.mustCall(() => { - // TODO(@jasnell): This will be false for now because no - // early data was sent. Once we actually start making - // use of early data on the client side, this should be - // true when the early data was accepted. - assert(!req.usingEarlyData); - })); + // TODO(@jasnell): This will be false for now because no + // early data was sent. Once we actually start making + // use of early data on the client side, this should be + // true when the early data was accepted. + assert(!req.usingEarlyData); } })().then(common.mustCall()); diff --git a/test/parallel/test-quic-quicsession-send-fd.js b/test/parallel/test-quic-quicsession-send-fd.js index 55980a068d5c10..c99d3580bd606d 100644 --- a/test/parallel/test-quic-quicsession-send-fd.js +++ b/test/parallel/test-quic-quicsession-send-fd.js @@ -30,31 +30,29 @@ async function test({ variant, offset, length }) { const client = createQuicSocket({ client: options }); let fd; - server.on('session', common.mustCall((session) => { - session.on('secure', common.mustCall((servername, alpn, cipher) => { - const stream = session.openStream({ halfOpen: true }); + server.on('session', common.mustCall(async (session) => { + const stream = await session.openStream({ halfOpen: true }); - // The data and end events won't emit because - // the stream is never readable. - stream.on('data', common.mustNotCall()); - stream.on('end', common.mustNotCall()); + // The data and end events won't emit because + // the stream is never readable. + stream.on('data', common.mustNotCall()); + stream.on('end', common.mustNotCall()); - stream.on('finish', common.mustCall()); - stream.on('close', common.mustCall()); + stream.on('finish', common.mustCall()); + stream.on('close', common.mustCall()); - if (variant === 'sendFD') { - fd = fs.openSync(__filename, 'r'); - stream.sendFD(fd, { offset, length }); - } else if (variant === 'sendFD+fileHandle') { - fs.promises.open(__filename, 'r').then(common.mustCall((handle) => { - fd = handle; - stream.sendFD(handle, { offset, length }); - })); - } else { - assert.strictEqual(variant, 'sendFile'); - stream.sendFile(__filename, { offset, length }); - } - })); + if (variant === 'sendFD') { + fd = fs.openSync(__filename, 'r'); + stream.sendFD(fd, { offset, length }); + } else if (variant === 'sendFD+fileHandle') { + fs.promises.open(__filename, 'r').then(common.mustCall((handle) => { + fd = handle; + stream.sendFD(handle, { offset, length }); + })); + } else { + assert.strictEqual(variant, 'sendFile'); + stream.sendFile(__filename, { offset, length }); + } session.on('close', common.mustCall()); })); diff --git a/test/parallel/test-quic-quicsession-send-file-close-before-open.js b/test/parallel/test-quic-quicsession-send-file-close-before-open.js index 64b3f045804fa6..368a8d046e046d 100644 --- a/test/parallel/test-quic-quicsession-send-file-close-before-open.js +++ b/test/parallel/test-quic-quicsession-send-file-close-before-open.js @@ -15,18 +15,16 @@ const server = createQuicSocket({ server: options }); const client = createQuicSocket({ client: options }); (async function() { - server.on('session', common.mustCall((session) => { - session.on('secure', common.mustCall((servername, alpn, cipher) => { - const stream = session.openStream({ halfOpen: false }); + server.on('session', common.mustCall(async (session) => { + const stream = await session.openStream({ halfOpen: false }); - fs.open = common.mustCall(fs.open); - fs.close = common.mustCall(fs.close); + fs.open = common.mustCall(fs.open); + fs.close = common.mustCall(fs.close); - stream.sendFile(__filename); - stream.destroy(); // Destroy the stream before opening the fd finishes. + stream.sendFile(__filename); + stream.destroy(); // Destroy the stream before opening the fd finishes. - session.close(); - })); + session.close(); session.on('close', common.mustCall()); })); diff --git a/test/parallel/test-quic-quicsession-send-file-open-error-handled.js b/test/parallel/test-quic-quicsession-send-file-open-error-handled.js index c384bc7f4f3aa6..96257b9892a4d3 100644 --- a/test/parallel/test-quic-quicsession-send-file-open-error-handled.js +++ b/test/parallel/test-quic-quicsession-send-file-open-error-handled.js @@ -15,19 +15,17 @@ const server = createQuicSocket({ server: options }); const client = createQuicSocket({ client: options }); (async function() { - server.on('session', common.mustCall((session) => { - session.on('secure', common.mustCall((servername, alpn, cipher) => { - const stream = session.openStream({ halfOpen: true }); - const nonexistentPath = path.resolve(__dirname, 'nonexistent.file'); - stream.sendFile(nonexistentPath, { - onError: common.expectsError({ - code: 'ENOENT', - syscall: 'open', - path: nonexistentPath - }) - }); - session.close(); - })); + server.on('session', common.mustCall(async (session) => { + const stream = await session.openStream({ halfOpen: true }); + const nonexistentPath = path.resolve(__dirname, 'nonexistent.file'); + stream.sendFile(nonexistentPath, { + onError: common.expectsError({ + code: 'ENOENT', + syscall: 'open', + path: nonexistentPath + }) + }); + session.close(); session.on('close', common.mustCall()); })); diff --git a/test/parallel/test-quic-quicsession-send-file-open-error.js b/test/parallel/test-quic-quicsession-send-file-open-error.js index 628fdf5dbb3868..5d96c5c82b3c89 100644 --- a/test/parallel/test-quic-quicsession-send-file-open-error.js +++ b/test/parallel/test-quic-quicsession-send-file-open-error.js @@ -15,18 +15,16 @@ const server = createQuicSocket({ server: options }); const client = createQuicSocket({ client: options }); (async function() { - server.on('session', common.mustCall((session) => { - session.on('secure', common.mustCall((servername, alpn, cipher) => { - const stream = session.openStream({ halfOpen: false }); - const nonexistentPath = path.resolve(__dirname, 'nonexistent.file'); - stream.on('error', common.expectsError({ - code: 'ENOENT', - syscall: 'open', - path: nonexistentPath - })); - stream.sendFile(nonexistentPath); - session.close(); + server.on('session', common.mustCall(async (session) => { + const stream = await session.openStream({ halfOpen: false }); + const nonexistentPath = path.resolve(__dirname, 'nonexistent.file'); + stream.on('error', common.expectsError({ + code: 'ENOENT', + syscall: 'open', + path: nonexistentPath })); + stream.sendFile(nonexistentPath); + session.close(); session.on('close', common.mustCall()); })); diff --git a/test/parallel/test-quic-quicsession-server-openstream-pending.js b/test/parallel/test-quic-quicsession-server-openstream-pending.js deleted file mode 100644 index e588b4c26d1812..00000000000000 --- a/test/parallel/test-quic-quicsession-server-openstream-pending.js +++ /dev/null @@ -1,57 +0,0 @@ -// Flags: --no-warnings -'use strict'; -const common = require('../common'); -if (!common.hasQuic) - common.skip('missing quic'); - -// Test that opening a stream works even if the session isn’t ready yet. - -const assert = require('assert'); -const { createQuicSocket } = require('net'); -const { key, cert, ca } = require('../common/quic'); -const { once } = require('events'); -const options = { key, cert, ca, alpn: 'meow' }; - -const server = createQuicSocket({ server: options }); -const client = createQuicSocket({ client: options }); - -(async () => { - server.on('session', common.mustCall((session) => { - // The server can create a stream immediately without waiting - // for the secure event... however, the data will not actually - // be transmitted until the handshake is completed. - const stream = session.openStream({ halfOpen: true }); - stream.on('close', common.mustCall()); - stream.on('error', console.log); - stream.end('hello'); - - session.on('stream', common.mustNotCall()); - })); - - await server.listen(); - - const req = await client.connect({ - address: common.localhostIPv4, - port: server.endpoints[0].address.port, - }); - - const [ stream ] = await once(req, 'stream'); - - let data = ''; - stream.setEncoding('utf8'); - stream.on('data', (chunk) => data += chunk); - stream.on('end', common.mustCall()); - - await once(stream, 'close'); - - assert.strictEqual(data, 'hello'); - - server.close(); - client.close(); - - await Promise.all([ - once(server, 'close'), - once(client, 'close') - ]); - -})().then(common.mustCall()); diff --git a/test/parallel/test-quic-quicsocket-packetloss-stream-rx.js b/test/parallel/test-quic-quicsocket-packetloss-stream-rx.js index c930e14beb1cfe..82f21b4ab699aa 100644 --- a/test/parallel/test-quic-quicsocket-packetloss-stream-rx.js +++ b/test/parallel/test-quic-quicsocket-packetloss-stream-rx.js @@ -64,7 +64,7 @@ const countdown = new Countdown(1, () => { port: server.endpoints[0].address.port, }); - const stream = req.openStream(); + const stream = await req.openStream(); let n = 0; // This forces multiple stream packets to be sent out diff --git a/test/parallel/test-quic-quicsocket-packetloss-stream-tx.js b/test/parallel/test-quic-quicsocket-packetloss-stream-tx.js index 360b42e5f0a004..d811ac0af14996 100644 --- a/test/parallel/test-quic-quicsocket-packetloss-stream-tx.js +++ b/test/parallel/test-quic-quicsocket-packetloss-stream-tx.js @@ -64,7 +64,7 @@ const countdown = new Countdown(1, () => { port: server.endpoints[0].address.port, }); - const stream = req.openStream(); + const stream = await req.openStream(); let n = 0; // This forces multiple stream packets to be sent out diff --git a/test/parallel/test-quic-quicstream-close-early.js b/test/parallel/test-quic-quicstream-close-early.js index 5460c3e2b9b239..10ce6908e29c43 100644 --- a/test/parallel/test-quic-quicstream-close-early.js +++ b/test/parallel/test-quic-quicstream-close-early.js @@ -22,17 +22,15 @@ const countdown = new Countdown(2, () => { }); (async function() { - server.on('session', common.mustCall((session) => { - session.on('secure', common.mustCall((servername, alpn, cipher) => { - const uni = session.openStream({ halfOpen: true }); - uni.write('hi', common.expectsError()); - uni.on('error', common.mustCall(() => { - assert.strictEqual(uni.aborted, true); - })); - uni.on('data', common.mustNotCall()); - uni.on('close', common.mustCall()); - uni.close(3); + server.on('session', common.mustCall(async (session) => { + const uni = await session.openStream({ halfOpen: true }); + uni.write('hi', common.expectsError()); + uni.on('error', common.mustCall(() => { + assert.strictEqual(uni.aborted, true); })); + uni.on('data', common.mustNotCall()); + uni.on('close', common.mustCall()); + uni.close(3); session.on('stream', common.mustNotCall()); session.on('close', common.mustCall()); })); @@ -44,20 +42,6 @@ const countdown = new Countdown(2, () => { port: server.endpoints[0].address.port, }); - req.on('secure', common.mustCall((servername, alpn, cipher) => { - const stream = req.openStream(); - stream.write('hello', common.expectsError()); - stream.write('there', common.expectsError()); - stream.on('error', common.mustCall(() => { - assert.strictEqual(stream.aborted, true); - })); - stream.on('end', common.mustNotCall()); - stream.on('close', common.mustCall(() => { - countdown.dec(); - })); - stream.close(1); - })); - req.on('stream', common.mustCall((stream) => { stream.on('abort', common.mustNotCall()); stream.on('data', common.mustCall((chunk) => { @@ -69,6 +53,18 @@ const countdown = new Countdown(2, () => { })); })); + const stream = await req.openStream(); + stream.write('hello', common.expectsError()); + stream.write('there', common.expectsError()); + stream.on('error', common.mustCall(() => { + assert.strictEqual(stream.aborted, true); + })); + stream.on('end', common.mustNotCall()); + stream.on('close', common.mustCall(() => { + countdown.dec(); + })); + stream.close(1); + await Promise.all([ once(server, 'close'), once(client, 'close') diff --git a/test/parallel/test-quic-quicstream-destroy.js b/test/parallel/test-quic-quicstream-destroy.js index 480848236e2295..8fb2f3fb1ec36a 100644 --- a/test/parallel/test-quic-quicstream-destroy.js +++ b/test/parallel/test-quic-quicstream-destroy.js @@ -37,7 +37,7 @@ const server = createQuicSocket({ server: options }); port: server.endpoints[0].address.port }); - const stream = req.openStream(); + const stream = await req.openStream(); stream.write('foo'); // Do not explicitly end the stream here. diff --git a/test/parallel/test-quic-quicstream-identifiers.js b/test/parallel/test-quic-quicstream-identifiers.js index f27a256efa8bb4..98ffc3e9a05b17 100644 --- a/test/parallel/test-quic-quicstream-identifiers.js +++ b/test/parallel/test-quic-quicstream-identifiers.js @@ -41,34 +41,33 @@ const countdown = new Countdown(4, () => { const closeHandler = common.mustCall(() => countdown.dec(), 4); (async function() { - server.on('session', common.mustCall((session) => { - session.on('secure', common.mustCall(() => { - ([3, 1n, [], {}, null, 'meow']).forEach((halfOpen) => { - assert.throws(() => session.openStream({ halfOpen }), { - code: 'ERR_INVALID_ARG_TYPE', + server.on('session', common.mustCall(async (session) => { + ([3, 1n, [], {}, null, 'meow']).forEach((halfOpen) => { + assert.rejects( + session.openStream({ halfOpen }), { + code: 'ERR_INVALID_ARG_TYPE' }); - }); + }); - const uni = session.openStream({ halfOpen: true }); - uni.end('test'); + const uni = await session.openStream({ halfOpen: true }); + uni.end('test'); - const bidi = session.openStream(); - bidi.end('test'); - bidi.resume(); - bidi.on('end', common.mustCall()); + const bidi = await session.openStream(); + bidi.end('test'); + bidi.resume(); + bidi.on('end', common.mustCall()); - assert.strictEqual(uni.id, 3); - assert(uni.unidirectional); - assert(uni.serverInitiated); - assert(!uni.bidirectional); - assert(!uni.clientInitiated); + assert.strictEqual(uni.id, 3); + assert(uni.unidirectional); + assert(uni.serverInitiated); + assert(!uni.bidirectional); + assert(!uni.clientInitiated); - assert.strictEqual(bidi.id, 1); - assert(bidi.bidirectional); - assert(bidi.serverInitiated); - assert(!bidi.unidirectional); - assert(!bidi.clientInitiated); - })); + assert.strictEqual(bidi.id, 1); + assert(bidi.bidirectional); + assert(bidi.serverInitiated); + assert(!bidi.unidirectional); + assert(!bidi.clientInitiated); session.on('stream', common.mustCall((stream) => { assert(stream.clientInitiated); @@ -96,28 +95,26 @@ const closeHandler = common.mustCall(() => countdown.dec(), 4); port: server.endpoints[0].address.port, }); - req.on('secure', common.mustCall(() => { - const bidi = req.openStream(); - bidi.end('test'); - bidi.resume(); - bidi.on('close', closeHandler); - assert.strictEqual(bidi.id, 0); - - assert(bidi.clientInitiated); - assert(bidi.bidirectional); - assert(!bidi.serverInitiated); - assert(!bidi.unidirectional); - - const uni = req.openStream({ halfOpen: true }); - uni.end('test'); - uni.on('close', closeHandler); - assert.strictEqual(uni.id, 2); - - assert(uni.clientInitiated); - assert(!uni.bidirectional); - assert(!uni.serverInitiated); - assert(uni.unidirectional); - })); + const bidi = await req.openStream(); + bidi.end('test'); + bidi.resume(); + bidi.on('close', closeHandler); + assert.strictEqual(bidi.id, 0); + + assert(bidi.clientInitiated); + assert(bidi.bidirectional); + assert(!bidi.serverInitiated); + assert(!bidi.unidirectional); + + const uni = await req.openStream({ halfOpen: true }); + uni.end('test'); + uni.on('close', closeHandler); + assert.strictEqual(uni.id, 2); + + assert(uni.clientInitiated); + assert(!uni.bidirectional); + assert(!uni.serverInitiated); + assert(uni.unidirectional); req.on('stream', common.mustCall((stream) => { assert(stream.serverInitiated); diff --git a/test/parallel/test-quic-simple-client-migrate.js b/test/parallel/test-quic-simple-client-migrate.js index b72911093622e6..d40a693ba3b93d 100644 --- a/test/parallel/test-quic-simple-client-migrate.js +++ b/test/parallel/test-quic-simple-client-migrate.js @@ -22,9 +22,10 @@ const server = createQuicSocket({ server: options }); (async function() { server.on('session', common.mustCall((session) => { - session.on('stream', common.mustCall((stream) => { + session.on('stream', common.mustCall(async (stream) => { pipeline(stream, stream, common.mustCall()); - session.openStream({ halfOpen: true }).end('Hello from the server'); + (await session.openStream({ halfOpen: true })) + .end('Hello from the server'); })); })); @@ -40,27 +41,6 @@ const server = createQuicSocket({ server: options }); server.close(); }); - req.on('secure', common.mustCall(() => { - let data = ''; - const stream = req.openStream(); - stream.setEncoding('utf8'); - stream.on('data', (chunk) => data += chunk); - stream.on('end', common.mustCall(() => { - assert.strictEqual(data, 'Hello from the client'); - })); - stream.on('close', common.mustCall()); - // Send some data on one connection... - stream.write('Hello '); - - // Wait just a bit, then migrate to a different - // QuicSocket and continue sending. - setTimeout(common.mustCall(async () => { - await req.setSocket(client2); - client.close(); - stream.end('from the client'); - }), common.platformTimeout(100)); - })); - req.on('stream', common.mustCall((stream) => { let data = ''; stream.setEncoding('utf8'); @@ -71,6 +51,25 @@ const server = createQuicSocket({ server: options }); stream.on('close', common.mustCall()); })); + let data = ''; + const stream = await req.openStream(); + stream.setEncoding('utf8'); + stream.on('data', (chunk) => data += chunk); + stream.on('end', common.mustCall(() => { + assert.strictEqual(data, 'Hello from the client'); + })); + stream.on('close', common.mustCall()); + // Send some data on one connection... + stream.write('Hello '); + + // Wait just a bit, then migrate to a different + // QuicSocket and continue sending. + setTimeout(common.mustCall(async () => { + await req.setSocket(client2); + client.close(); + stream.end('from the client'); + }), common.platformTimeout(100)); + await Promise.all([ once(server, 'close'), once(client2, 'close') diff --git a/test/parallel/test-quic-statelessreset.js b/test/parallel/test-quic-statelessreset.js index ff1d2c721315ba..70fc8c0fd02751 100644 --- a/test/parallel/test-quic-statelessreset.js +++ b/test/parallel/test-quic-statelessreset.js @@ -57,14 +57,15 @@ server.on('close', common.mustCall(() => { port: server.endpoints[0].address.port, }); - const stream = req.openStream(); - stream.end('hello'); - stream.resume(); - stream.on('close', common.mustCall()); - req.on('close', common.mustCall(() => { assert.strictEqual(req.statelessReset, true); server.close(); client.close(); })); + + const stream = await req.openStream(); + stream.end('hello'); + stream.resume(); + stream.on('close', common.mustCall()); + })().then(common.mustCall()); diff --git a/test/parallel/test-quic-with-fake-udp.js b/test/parallel/test-quic-with-fake-udp.js index 2968d141b81b5f..3baf4ec4a695a2 100644 --- a/test/parallel/test-quic-with-fake-udp.js +++ b/test/parallel/test-quic-with-fake-udp.js @@ -30,16 +30,14 @@ const client = createQuicSocket({ clientSide.afterBind(); (async function() { - server.on('session', common.mustCall((session) => { - session.on('secure', common.mustCall(() => { - const stream = session.openStream({ halfOpen: false }); - stream.end('Hi!'); - stream.on('data', common.mustNotCall()); - stream.on('finish', common.mustCall()); - stream.on('close', common.mustNotCall()); - stream.on('end', common.mustNotCall()); - })); + server.on('session', common.mustCall(async (session) => { session.on('close', common.mustNotCall()); + const stream = await session.openStream({ halfOpen: false }); + stream.end('Hi!'); + stream.on('data', common.mustNotCall()); + stream.on('finish', common.mustCall()); + stream.on('close', common.mustNotCall()); + stream.on('end', common.mustNotCall()); })); await server.listen();