Skip to content

Commit

Permalink
quic: use async _construct for QuicStream
Browse files Browse the repository at this point in the history
PR-URL: #34351
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
  • Loading branch information
jasnell committed Jul 23, 2020
1 parent 8bd61d4 commit b06fe33
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 37 deletions.
5 changes: 3 additions & 2 deletions doc/api/quic.md
Expand Up @@ -1047,8 +1047,9 @@ added: REPLACEME

Returns a `Promise` that resolves a new `QuicStream`.

The `Promise` will be rejected if the `QuicSession` has been destroyed or is in
the process of a graceful shutdown.
The `Promise` will be rejected if the `QuicSession` has been destroyed, is in
the process of a graceful shutdown, or the `QuicSession` is otherwise blocked
from opening a new stream.

#### `quicsession.ping()`
<!--YAML
Expand Down
117 changes: 83 additions & 34 deletions lib/internal/quic/core.js
Expand Up @@ -48,7 +48,7 @@ const {
QLogStream,
} = require('internal/quic/util');
const assert = require('internal/assert');
const EventEmitter = require('events');
const { EventEmitter, once } = require('events');
const fs = require('fs');
const fsPromisesInternal = require('internal/fs/promises');
const { Duplex } = require('stream');
Expand Down Expand Up @@ -226,6 +226,7 @@ const kMaybeBind = Symbol('kMaybeBind');
const kOnFileOpened = Symbol('kOnFileOpened');
const kOnFileUnpipe = Symbol('kOnFileUnpipe');
const kOnPipedFileHandleRead = Symbol('kOnPipedFileHandleRead');
const kReady = Symbol('kReady');
const kRemoveSession = Symbol('kRemove');
const kRemoveStream = Symbol('kRemoveStream');
const kServerBusy = Symbol('kServerBusy');
Expand Down Expand Up @@ -2167,30 +2168,15 @@ class QuicSession extends EventEmitter {
defaultEncoding,
} = validateQuicStreamOptions(options);

await this[kHandshakeComplete]();

if (this.destroyed) {
throw new ERR_INVALID_STATE(
`${this.constructor.name} is already destroyed`);
}
if (this.closing) {
throw new ERR_INVALID_STATE(
`${this.constructor.name} is closing`);
}

const handle =
halfOpen ?
_openUnidirectionalStream(this[kHandle]) :
_openBidirectionalStream(this[kHandle]);

if (handle === undefined)
throw new ERR_OPERATION_FAILED('Unable to create QuicStream');

return new QuicStream({
const stream = new QuicStream({
highWaterMark,
defaultEncoding,
readable: !halfOpen
}, this, handle);
}, this);

await once(stream, kReady);

return stream;
}

get duration() {
Expand Down Expand Up @@ -2532,6 +2518,7 @@ function streamOnPause() {
this[kHandle].readStop();
}
class QuicStream extends Duplex {
#count = 0;
[kInternalState] = {
closed: false,
closePromise: undefined,
Expand All @@ -2547,6 +2534,7 @@ class QuicStream extends Duplex {
dataRateHistogram: undefined,
dataSizeHistogram: undefined,
dataAckHistogram: undefined,
ready: false,
sharedState: undefined,
stats: undefined,
};
Expand Down Expand Up @@ -2578,7 +2566,45 @@ class QuicStream extends Duplex {
this._readableState.readingMore = true;
this.on('pause', streamOnPause);

this[kSetHandle](handle);
if (handle !== undefined)
this[kSetHandle](handle);
}

async _construct(callback) {
try {
if (this[kInternalState].ready)
return callback();

// Handle is already initialized
const unidirectional = !this.readable;

await this.session[kHandshakeComplete]();

if (this.destroyed) {
throw new ERR_INVALID_STATE('QuicStream was destroyed');
}
if (this.session.destroyed) {
throw new ERR_INVALID_STATE(
`${this.session.constructor.name} was destroyed`);
}
if (this.session.closing) {
throw new ERR_INVALID_STATE(
`${this.session.constructor.name} is closing`);
}

const handle =
unidirectional ?
_openUnidirectionalStream(this.session[kHandle]) :
_openBidirectionalStream(this.session[kHandle]);

if (handle === undefined)
throw new ERR_OPERATION_FAILED('Unable to create QuicStream');

this[kSetHandle](handle);
callback();
} catch (error) {
callback(error);
}
}

// Set handle is called once the QuicSession has been able
Expand All @@ -2589,6 +2615,8 @@ class QuicStream extends Duplex {
// written will be buffered until kSetHandle is called.
[kSetHandle](handle) {
const state = this[kInternalState];
const current = this[kHandle];
this[kHandle] = handle;
if (handle !== undefined) {
handle.onread = onStreamRead;
handle[owner_symbol] = this;
Expand All @@ -2599,11 +2627,13 @@ class QuicStream extends Duplex {
state.dataAckHistogram = new Histogram(handle.ack);
state.sharedState = new QuicStreamSharedState(handle.state);
state.session[kAddStream](state.id, this);
state.ready = true;
this.emit(kReady);
} else {
if (this[kHandle] !== undefined) {
this[kHandle].stats[IDX_QUIC_STREAM_STATS_DESTROYED_AT] =
if (current !== undefined) {
current.stats[IDX_QUIC_STREAM_STATS_DESTROYED_AT] =
process.hrtime.bigint();
state.stats = new BigInt64Array(this[kHandle].stats);
state.stats = new BigInt64Array(current.stats);
}
state.sharedState = undefined;
if (state.dataRateHistogram)
Expand All @@ -2613,7 +2643,6 @@ class QuicStream extends Duplex {
if (state.dataAckHistogram)
state.dataAckHistogram[kDestroyHistogram]();
}
this[kHandle] = handle;
}

[kStreamReset](code) {
Expand Down Expand Up @@ -2643,6 +2672,8 @@ class QuicStream extends Duplex {
this.end();
}

// TODO(@jasnell): Investigate later if a Promise version
// of finished() can work here instead.
return promise;
}

Expand All @@ -2663,6 +2694,7 @@ class QuicStream extends Duplex {
else if (typeof state.closePromiseResolve === 'function')
state.closePromiseResolve();

// TODO(@jasnell): Investigate how we can eliminate the nextTick here
process.nextTick(() => callback(error));
}

Expand Down Expand Up @@ -2754,7 +2786,7 @@ class QuicStream extends Duplex {
}

[kWriteGeneric](writev, data, encoding, cb) {
if (this.destroyed)
if (this.destroyed || this.detached)
return; // TODO(addaleax): Can this happen?

this[kUpdateTimer]();
Expand Down Expand Up @@ -2829,6 +2861,8 @@ class QuicStream extends Duplex {
}

sendFile(path, options = {}) {
if (this.detached)
throw new ERR_INVALID_STATE('Unable to send file');
fs.open(path, 'r', QuicStream[kOnFileOpened].bind(this, options));
}

Expand Down Expand Up @@ -2856,6 +2890,9 @@ class QuicStream extends Duplex {
if (this.destroyed || this[kInternalState].closed)
return;

if (this.detached)
throw new ERR_INVALID_STATE('Unable to send file descriptor');

validateInteger(offset, 'options.offset', /* min */ -1);
validateInteger(length, 'options.length', /* min */ -1);

Expand Down Expand Up @@ -2947,6 +2984,12 @@ class QuicStream extends Duplex {
if (this.destroyed)
throw new ERR_INVALID_STATE('QuicStream is already destroyed');

if (this.detached) {
throw new ERR_INVALID_STATE(
'Push stream could not be opened on this QuicSession. ' +
'Push is either disabled or currently blocked.');
}

const state = this[kInternalState];
const {
highWaterMark = state.highWaterMark,
Expand Down Expand Up @@ -2995,9 +3038,11 @@ class QuicStream extends Duplex {
}

submitInformationalHeaders(headers = {}) {
// TODO(@jasnell): Likely better to throw here instead of return false
if (this.destroyed)
return false;
throw new ERR_INVALID_STATE('QuicStream is already destroyed');

if (this.detached)
throw new ERR_INVALID_STATE('Unable to submit headers');

validateObject(headers, 'headers');

Expand Down Expand Up @@ -3025,9 +3070,11 @@ class QuicStream extends Duplex {
}

submitInitialHeaders(headers = {}, options = {}) {
// TODO(@jasnell): Likely better to throw here instead of return false
if (this.destroyed)
return false;
throw new ERR_INVALID_STATE('QuicStream is already destroyed');

if (this.detached)
throw new ERR_INVALID_STATE('Unable to submit headers');

const { terminal } = { ...options };

Expand Down Expand Up @@ -3062,9 +3109,11 @@ class QuicStream extends Duplex {
}

submitTrailingHeaders(headers = {}) {
// TODO(@jasnell): Likely better to throw here instead of return false
if (this.destroyed)
return false;
throw new ERR_INVALID_STATE('QuicStream is already destroyed');

if (this.detached)
throw new ERR_INVALID_STATE('Unable to submit headers');

validateObject(headers, 'headers');

Expand Down
1 change: 1 addition & 0 deletions src/quic/node_quic_buffer.cc
Expand Up @@ -115,6 +115,7 @@ int QuicBuffer::DoPull(
size_t len = 0;
size_t numbytes = 0;
int status = bob::Status::STATUS_CONTINUE;

// There's no data to read.
if (!remaining() || head_ == nullptr) {
status = is_ended() ?
Expand Down
2 changes: 1 addition & 1 deletion src/quic/node_quic_session.cc
Expand Up @@ -1998,7 +1998,7 @@ bool QuicSession::ReceiveStreamData(
const uint8_t* data,
size_t datalen,
uint64_t offset) {
auto leave = OnScopeLeave([=]() {
auto leave = OnScopeLeave([&]() {
// Unconditionally extend the flow control window for the entire
// session but not for the individual Stream.
ExtendOffset(datalen);
Expand Down
1 change: 1 addition & 0 deletions src/stream_base.cc
Expand Up @@ -68,6 +68,7 @@ int StreamBase::UseUserBuffer(const FunctionCallbackInfo<Value>& args) {
int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
CHECK(args[0]->IsObject());
Local<Object> req_wrap_obj = args[0].As<Object>();

return Shutdown(req_wrap_obj);
}

Expand Down

0 comments on commit b06fe33

Please sign in to comment.