Navigation Menu

Skip to content

Commit

Permalink
quic: implement QuicSession close as promise
Browse files Browse the repository at this point in the history
PR-URL: #34283
Reviewed-By: Anna Henningsen <anna@addaleax.net>
  • Loading branch information
jasnell committed Jul 16, 2020
1 parent 8e5c5b1 commit 57c1129
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 80 deletions.
9 changes: 4 additions & 5 deletions doc/api/quic.md
Expand Up @@ -352,7 +352,7 @@ Binds the `QuicEndpoint` if it has not already been bound. User code will
not typically be responsible for binding a `QuicEndpoint` as the owning
`QuicSocket` will do that automatically.

* `options` {object}
* `options` {Object}
* `signal` {AbortSignal} Optionally allows the `bind()` to be canceled
using an `AbortController`.
* Returns: {Promise}
Expand Down Expand Up @@ -821,17 +821,16 @@ added: REPLACEME

Information about the cipher algorithm selected for the session.

#### quicsession.close(\[callback\])
#### quicsession.close()
<!-- YAML
added: REPLACEME
-->

* `callback` {Function} Callback invoked when the close operation is completed

Begins a graceful close of the `QuicSession`. Existing `QuicStream` instances
will be permitted to close naturally. New `QuicStream` instances will not be
permitted. Once all `QuicStream` instances have closed, the `QuicSession`
instance will be destroyed.
instance will be destroyed. Returns a `Promise` that is resolved once the
`QuicSession` instance is destroyed.

#### quicsession.closeCode
<!-- YAML
Expand Down
151 changes: 76 additions & 75 deletions lib/internal/quic/core.js
Expand Up @@ -18,6 +18,7 @@ const {
Number,
Promise,
PromiseAll,
PromiseReject,
RegExp,
Set,
Symbol,
Expand Down Expand Up @@ -98,7 +99,6 @@ const {
const {
codes: {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_CALLBACK,
ERR_INVALID_STATE,
ERR_OPERATION_FAILED,
ERR_QUIC_FAILED_TO_CREATE_SESSION,
Expand Down Expand Up @@ -795,7 +795,7 @@ class QuicEndpoint {

[kClose]() {
if (this.destroyed) {
return Promise.reject(
return PromiseReject(
new ERR_INVALID_STATE('QuicEndpoint is already destroyed'));
}
const promise = deferredClosePromise(this[kInternalState]);
Expand Down Expand Up @@ -1392,7 +1392,7 @@ class QuicSocket extends EventEmitter {

[kClose]() {
if (this.destroyed) {
return Promise.reject(
return PromiseReject(
new ERR_INVALID_STATE('QuicSocket is already destroyed'));
}
const state = this[kInternalState];
Expand All @@ -1413,14 +1413,13 @@ class QuicSocket extends EventEmitter {
return promise;
}

// Otherwise, loop through each of the known sessions
// and close them.
// TODO(@jasnell): These will be promises soon, but we
// do not want to await them.
for (const session of state.sessions)
session.close();

return promise;
// Otherwise, loop through each of the known sessions and close them.
const reqs = [promise];
for (const session of state.sessions) {
reqs.push(session.close()
.catch((error) => this.destroy(error)));
}
return PromiseAll(reqs);
}

// Initiate an abrupt close and destruction of the QuicSocket.
Expand Down Expand Up @@ -1656,7 +1655,9 @@ class QuicSession extends EventEmitter {
cipherVersion: undefined,
closeCode: NGTCP2_NO_ERROR,
closeFamily: QUIC_ERROR_APPLICATION,
closing: false,
closePromise: undefined,
closePromiseResolve: undefined,
closePromiseReject: undefined,
destroyed: false,
earlyData: false,
handshakeComplete: false,
Expand Down Expand Up @@ -1768,17 +1769,6 @@ class QuicSession extends EventEmitter {
this.destroy(err);
}

// Causes the QuicSession to be immediately destroyed, but with
// additional metadata set.
[kDestroy](code, family, silent, statelessReset) {
const state = this[kInternalState];
state.closeCode = code;
state.closeFamily = family;
state.silentClose = silent;
state.statelessReset = statelessReset;
this.destroy();
}

// Closes the specified stream with the given code. The
// QuicStream object will be destroyed.
[kStreamClose](id, code) {
Expand All @@ -1789,23 +1779,23 @@ class QuicSession extends EventEmitter {
stream.destroy();
}

// Delivers a block of headers to the appropriate QuicStream
// instance. This will only be called if the ALPN selected
// is known to support headers.
[kHeaders](id, headers, kind, push_id) {
[kStreamReset](id, code) {
const stream = this[kInternalState].streams.get(id);
if (stream === undefined)
return;

stream[kHeaders](headers, kind, push_id);
stream[kStreamReset](code);
}

[kStreamReset](id, code) {
// Delivers a block of headers to the appropriate QuicStream
// instance. This will only be called if the ALPN selected
// is known to support headers.
[kHeaders](id, headers, kind, push_id) {
const stream = this[kInternalState].streams.get(id);
if (stream === undefined)
return;

stream[kStreamReset](code);
stream[kHeaders](headers, kind, push_id);
}

[kInspect](depth, options) {
Expand Down Expand Up @@ -1850,8 +1840,13 @@ class QuicSession extends EventEmitter {
if (!this[kHandshakePost]())
return;

process.nextTick(
emit.bind(this, 'secure', servername, alpn, this.cipher));
process.nextTick(() => {
try {
this.emit('secure', servername, alpn, this.cipher);
} catch (error) {
this.destroy(error);
}
});
}

// Non-op for the default case. QuicClientSession
Expand All @@ -1863,10 +1858,10 @@ class QuicSession extends EventEmitter {

[kRemoveStream](stream) {
this[kInternalState].streams.delete(stream.id);
this[kMaybeDestroy]();
}

[kAddStream](id, stream) {
stream.once('close', this[kMaybeDestroy].bind(this));
this[kInternalState].streams.set(id, stream);
}

Expand All @@ -1875,49 +1870,55 @@ class QuicSession extends EventEmitter {
// informationational notification. It is not called on
// server QuicSession instances.
[kUsePreferredAddress](address) {
process.nextTick(
emit.bind(this, 'usePreferredAddress', address));
process.nextTick(() => {
try {
this.emit('usePreferredAddress', address);
} catch (error) {
this.destroy(error);
}
});
}

close() {
return this[kInternalState].closePromise || this[kClose]();
}

[kClose]() {
if (this.destroyed) {
return PromiseReject(
new ERR_INVALID_STATE('QuicSession is already destroyed'));
}
const promise = deferredClosePromise(this[kInternalState]);
if (!this[kMaybeDestroy]()) {
this[kHandle].gracefulClose();
}
return promise;
}

get closing() {
return this[kInternalState].closePromise !== undefined;
}

// The QuicSession will be destroyed if close() has been
// called and there are no remaining streams
[kMaybeDestroy]() {
const state = this[kInternalState];
if (state.closing && state.streams.size === 0) {
if (this.closing && state.streams.size === 0) {
this.destroy();
return true;
}
return false;
}

// Closing allows any existing QuicStream's to gracefully
// complete while disallowing any new QuicStreams from being
// opened (in either direction). Calls to openStream() will
// fail, and new streams from the peer will be rejected/ignored.
close(callback) {
// Causes the QuicSession to be immediately destroyed, but with
// additional metadata set.
[kDestroy](code, family, silent, statelessReset) {
const state = this[kInternalState];
if (state.destroyed) {
throw new ERR_INVALID_STATE(
`${this.constructor.name} is already destroyed`);
}

if (callback) {
if (typeof callback !== 'function')
throw new ERR_INVALID_CALLBACK();
this.once('close', callback);
}

// If we're already closing, do nothing else.
// Callback will be invoked once the session
// has been destroyed
if (state.closing)
return;
state.closing = true;

// If there are no active streams, we can close immediately,
// otherwise set the graceful close flag.
if (!this[kMaybeDestroy]())
this[kHandle].gracefulClose();
state.closeCode = code;
state.closeFamily = family;
state.silentClose = silent;
state.statelessReset = statelessReset;
this.destroy();
}

// Destroying synchronously shuts down and frees the
Expand All @@ -1939,7 +1940,6 @@ class QuicSession extends EventEmitter {
if (state.destroyed)
return;
state.destroyed = true;
state.closing = false;

// Destroy any pending streams immediately. These
// are streams that have been created but have not
Expand Down Expand Up @@ -1974,7 +1974,13 @@ class QuicSession extends EventEmitter {

// If we are destroying with an error, schedule the
// error to be emitted on process.nextTick.
if (error) process.nextTick(emit.bind(this, 'error', error));
if (error) {
if (typeof state.closePromiseReject === 'function')
state.closePromiseReject(error);
process.nextTick(emit.bind(this, 'error', error));
} else if (typeof state.closePromiseResolve === 'function')
state.closePromiseResolve();

process.nextTick(emit.bind(this, 'close'));
}

Expand Down Expand Up @@ -2100,10 +2106,6 @@ class QuicSession extends EventEmitter {
return this[kInternalState].destroyed;
}

get closing() {
return this[kInternalState].closing;
}

get closeCode() {
const state = this[kInternalState];
return {
Expand All @@ -2123,11 +2125,11 @@ class QuicSession extends EventEmitter {

openStream(options) {
const state = this[kInternalState];
if (state.destroyed) {
if (this.destroyed) {
throw new ERR_INVALID_STATE(
`${this.constructor.name} is already destroyed`);
}
if (state.closing) {
if (this.closing) {
throw new ERR_INVALID_STATE(
`${this.constructor.name} is closing`);
}
Expand Down Expand Up @@ -2255,11 +2257,11 @@ class QuicSession extends EventEmitter {
updateKey() {
const state = this[kInternalState];
// Initiates a key update for the connection.
if (state.destroyed) {
if (this.destroyed) {
throw new ERR_INVALID_STATE(
`${this.constructor.name} is already destroyed`);
}
if (state.closing) {
if (this.closing) {
throw new ERR_INVALID_STATE(
`${this.constructor.name} is closing`);
}
Expand All @@ -2282,7 +2284,6 @@ class QuicSession extends EventEmitter {
return this[kHandle].removeFromSocket();
}
}

class QuicServerSession extends QuicSession {
[kInternalServerState] = {
contexts: []
Expand Down Expand Up @@ -2914,7 +2915,6 @@ class QuicStream extends Duplex {

_destroy(error, callback) {
const state = this[kInternalState];
state.session[kRemoveStream](this);
const handle = this[kHandle];
// Do not use handle after this point as the underlying C++
// object has been destroyed. Any attempt to use the object
Expand All @@ -2925,6 +2925,7 @@ class QuicStream extends Duplex {
state.stats = new BigInt64Array(handle.stats);
handle.destroy();
}
state.session[kRemoveStream](this);
// The destroy callback must be invoked in a nextTick
process.nextTick(() => callback(error));
}
Expand Down

0 comments on commit 57c1129

Please sign in to comment.