Skip to content

Commit

Permalink
quic: refactor to use more primordials
Browse files Browse the repository at this point in the history
PR-URL: #36211
Reviewed-By: Joyee Cheung <joyeec9h3@gmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
  • Loading branch information
aduh95 authored and danielleadams committed Dec 7, 2020
1 parent 617cb58 commit d29199e
Showing 1 changed file with 93 additions and 63 deletions.
156 changes: 93 additions & 63 deletions lib/internal/quic/core.js
Expand Up @@ -11,16 +11,23 @@ assertCrypto();

const {
ArrayFrom,
ArrayPrototypePush,
BigInt64Array,
Boolean,
Error,
FunctionPrototypeBind,
FunctionPrototypeCall,
Map,
Number,
Promise,
PromiseAll,
PromisePrototypeThen,
PromisePrototypeCatch,
PromisePrototypeFinally,
PromiseReject,
PromiseResolve,
Set,
ReflectApply,
SafeSet,
Symbol,
SymbolFor,
} = primordials;
Expand Down Expand Up @@ -302,22 +309,25 @@ function onSessionClose(code, family, silent, statelessReset) {
// being requested. It is only called if the 'clientHelloHandler' option is
// specified on listen().
function onSessionClientHello(alpn, servername, ciphers) {
this[owner_symbol][kClientHello](alpn, servername, ciphers)
.then((context) => {
PromisePrototypeThen(
this[owner_symbol][kClientHello](alpn, servername, ciphers),
(context) => {
if (context !== undefined && !context?.context)
throw new ERR_INVALID_ARG_TYPE('context', 'SecureContext', context);
this.onClientHelloDone(context?.context);
})
.catch((error) => this[owner_symbol].destroy(error));
},
(error) => this[owner_symbol].destroy(error)
);
}

// This callback is only ever invoked for QuicServerSession instances,
// and is used to trigger OCSP request processing when needed. The
// user callback must invoke .onCertDone() in order for the
// TLS handshake to continue.
function onSessionCert(servername) {
this[owner_symbol][kHandleOcsp](servername)
.then((data) => {
PromisePrototypeThen(
this[owner_symbol][kHandleOcsp](servername),
(data) => {
if (data !== undefined) {
if (typeof data === 'string')
data = Buffer.from(data);
Expand All @@ -329,17 +339,20 @@ function onSessionCert(servername) {
}
}
this.onCertDone(data);
})
.catch((error) => this[owner_symbol].destroy(error));
},
(error) => this[owner_symbol].destroy(error)
);
}

// This callback is only ever invoked for QuicClientSession instances,
// and is used to deliver the OCSP response as provided by the server.
// If the requestOCSP configuration option is false, this will never
// be called.
function onSessionStatus(data) {
this[owner_symbol][kHandleOcsp](data)
.catch((error) => this[owner_symbol].destroy(error));
PromisePrototypeCatch(
this[owner_symbol][kHandleOcsp](data),
(error) => this[owner_symbol].destroy(error)
);
}

// Called by the C++ internals when the TLS handshake is completed.
Expand Down Expand Up @@ -369,12 +382,13 @@ function onSessionHandshake(
// resumption and 0RTT.
function onSessionTicket(sessionTicket, transportParams) {
if (this[owner_symbol]) {
process.nextTick(
emit.bind(
this[owner_symbol],
'sessionTicket',
sessionTicket,
transportParams));
process.nextTick(FunctionPrototypeBind(
emit,
this[owner_symbol],
'sessionTicket',
sessionTicket,
transportParams
));
}
}

Expand All @@ -384,13 +398,14 @@ function onSessionTicket(sessionTicket, transportParams) {
function onSessionPathValidation(res, local, remote) {
const session = this[owner_symbol];
if (session) {
process.nextTick(
emit.bind(
session,
'pathValidation',
res === NGTCP2_PATH_VALIDATION_RESULT_FAILURE ? 'failure' : 'success',
local,
remote));
process.nextTick(FunctionPrototypeBind(
emit,
session,
'pathValidation',
res === NGTCP2_PATH_VALIDATION_RESULT_FAILURE ? 'failure' : 'success',
local,
remote
));
}
}

Expand Down Expand Up @@ -486,7 +501,7 @@ function onStreamHeaders(id, headers, kind, push_id) {
// When a stream is flow control blocked, causes a blocked event
// to be emitted. This is a purely informational event.
function onStreamBlocked() {
process.nextTick(emit.bind(this[owner_symbol], 'blocked'));
process.nextTick(FunctionPrototypeBind(emit, this[owner_symbol], 'blocked'));
}

// Register the callbacks with the QUIC internal binding.
Expand Down Expand Up @@ -543,14 +558,17 @@ function addressOrLocalhost(address, type) {
}

function deferredClosePromise(state) {
return state.closePromise = new Promise((resolve, reject) => {
state.closePromiseResolve = resolve;
state.closePromiseReject = reject;
}).finally(() => {
state.closePromise = undefined;
state.closePromiseResolve = undefined;
state.closePromiseReject = undefined;
});
return state.closePromise = PromisePrototypeFinally(
new Promise((resolve, reject) => {
state.closePromiseResolve = resolve;
state.closePromiseReject = reject;
}),
() => {
state.closePromise = undefined;
state.closePromiseResolve = undefined;
state.closePromiseReject = undefined;
}
);
}

async function resolvePreferredAddress(lookup, preferredAddress) {
Expand Down Expand Up @@ -640,7 +658,7 @@ class QuicEndpoint {
if (state.bindPromise !== undefined)
return state.bindPromise;

return state.bindPromise = this[kBind]().finally(() => {
return state.bindPromise = PromisePrototypeFinally(this[kBind](), () => {
state.bindPromise = undefined;
});
}
Expand Down Expand Up @@ -899,7 +917,7 @@ class QuicSocket extends EventEmitter {
closePromiseResolve: undefined,
closePromiseReject: undefined,
defaultEncoding: undefined,
endpoints: new Set(),
endpoints: new SafeSet(),
highWaterMark: undefined,
listenPending: false,
listenPromise: undefined,
Expand All @@ -908,7 +926,7 @@ class QuicSocket extends EventEmitter {
clientHelloHandler: undefined,
server: undefined,
serverSecureContext: undefined,
sessions: new Set(),
sessions: new SafeSet(),
state: kSocketUnbound,
sharedState: undefined,
stats: undefined,
Expand Down Expand Up @@ -1048,9 +1066,12 @@ class QuicSocket extends EventEmitter {
if (state.bindPromise !== undefined)
return state.bindPromise;

return state.bindPromise = this[kBind](options).finally(() => {
state.bindPromise = undefined;
});
return state.bindPromise = PromisePrototypeFinally(
this[kBind](options),
() => {
state.bindPromise = undefined;
}
);
}

async [kBind](options) {
Expand All @@ -1074,7 +1095,7 @@ class QuicSocket extends EventEmitter {

const binds = [];
for (const endpoint of state.endpoints)
binds.push(endpoint.bind({ signal }));
ArrayPrototypePush(binds, endpoint.bind({ signal }));

await PromiseAll(binds);

Expand Down Expand Up @@ -1169,9 +1190,12 @@ class QuicSocket extends EventEmitter {
if (state.listenPromise !== undefined)
return state.listenPromise;

return state.listenPromise = this[kListen](options).finally(() => {
state.listenPromise = undefined;
});
return state.listenPromise = PromisePrototypeFinally(
this[kListen](options),
() => {
state.listenPromise = undefined;
}
);
}

async [kListen](options) {
Expand Down Expand Up @@ -1388,8 +1412,9 @@ class QuicSocket extends EventEmitter {
// Otherwise, loop through each of the known sessions and close them.
const reqs = [promise];
for (const session of state.sessions) {
reqs.push(session.close()
.catch((error) => this.destroy(error)));
ArrayPrototypePush(reqs,
PromisePrototypeCatch(session.close(),
(error) => this.destroy(error)));
}
return PromiseAll(reqs);
}
Expand Down Expand Up @@ -1441,11 +1466,11 @@ class QuicSocket extends EventEmitter {
if (error) {
if (typeof state.closePromiseReject === 'function')
state.closePromiseReject(error);
process.nextTick(emit.bind(this, 'error', error));
process.nextTick(FunctionPrototypeBind(emit, this, 'error', error));
} else if (typeof state.closePromiseResolve === 'function') {
state.closePromiseResolve();
}
process.nextTick(emit.bind(this, 'close'));
process.nextTick(FunctionPrototypeBind(emit, this, 'close'));
}

ref() {
Expand Down Expand Up @@ -1714,14 +1739,17 @@ class QuicSession extends EventEmitter {
if (state.handshakeCompletePromise !== undefined)
return state.handshakeCompletePromise;

state.handshakeCompletePromise = new Promise((resolve, reject) => {
state.handshakeCompletePromiseResolve = resolve;
state.handshakeCompletePromiseReject = reject;
}).finally(() => {
state.handshakeCompletePromise = undefined;
state.handshakeCompletePromiseReject = undefined;
state.handshakeCompletePromiseResolve = undefined;
});
state.handshakeCompletePromise = PromisePrototypeFinally(
new Promise((resolve, reject) => {
state.handshakeCompletePromiseResolve = resolve;
state.handshakeCompletePromiseReject = reject;
}),
() => {
state.handshakeCompletePromise = undefined;
state.handshakeCompletePromiseReject = undefined;
state.handshakeCompletePromiseResolve = undefined;
}
);

return state.handshakeCompletePromise;
}
Expand Down Expand Up @@ -1985,7 +2013,7 @@ class QuicSession extends EventEmitter {
if (error) {
if (typeof state.closePromiseReject === 'function')
state.closePromiseReject(error);
process.nextTick(emit.bind(this, 'error', error));
process.nextTick(FunctionPrototypeBind(emit, this, 'error', error));
} else if (typeof state.closePromiseResolve === 'function')
state.closePromiseResolve();

Expand All @@ -1994,7 +2022,7 @@ class QuicSession extends EventEmitter {
new ERR_OPERATION_FAILED('Handshake failed'));
}

process.nextTick(emit.bind(this, 'close'));
process.nextTick(FunctionPrototypeBind(emit, this, 'close'));
}

// For server QuicSession instances, true if earlyData is
Expand Down Expand Up @@ -2698,7 +2726,7 @@ class QuicStream extends Duplex {
default:
assert.fail('Invalid headers kind');
}
process.nextTick(emit.bind(this, name, headers, push_id));
process.nextTick(FunctionPrototypeBind(emit, this, name, headers, push_id));
}

[kAfterAsyncWrite]({ bytes }) {
Expand Down Expand Up @@ -2809,7 +2837,7 @@ class QuicStream extends Duplex {
if (!this.destroyed) {
if (!this.detached)
this[kInternalState].sharedState.writeEnded = true;
super.end.apply(this, args);
ReflectApply(super.end, this, args);
}
return this;
}
Expand All @@ -2825,13 +2853,14 @@ class QuicStream extends Duplex {
state.didRead = true;
}

streamOnResume.call(this);
FunctionPrototypeCall(streamOnResume, this);
}

sendFile(path, options = {}) {
if (this.detached)
throw new ERR_INVALID_STATE('Unable to send file');
fs.open(path, 'r', QuicStream[kOnFileOpened].bind(this, options));
fs.open(path, 'r',
FunctionPrototypeBind(QuicStream[kOnFileOpened], this, options));
}

static [kOnFileOpened](options, err, fd) {
Expand All @@ -2847,7 +2876,7 @@ class QuicStream extends Duplex {
}

if (this.destroyed || this.closed) {
fs.close(fd, (err) => { if (err) throw err; });
fs.close(fd, assert.ifError);
return;
}

Expand Down Expand Up @@ -2895,7 +2924,8 @@ class QuicStream extends Duplex {
static [kOnFileUnpipe]() { // Called on the StreamPipe instance.
const stream = this.sink[owner_symbol];
if (stream.ownsFd)
this.source.close().catch(stream.destroy.bind(stream));
PromisePrototypeCatch(this.source.close(),
FunctionPrototypeBind(stream.destroy, stream));
else
this.source.releaseFD();
stream.end();
Expand Down

0 comments on commit d29199e

Please sign in to comment.