diff --git a/doc/api/quic.md b/doc/api/quic.md index c877cd12a6f37c..a6cd2ecd91c4b7 100644 --- a/doc/api/quic.md +++ b/doc/api/quic.md @@ -183,10 +183,12 @@ The `openStream()` method is used to create a new `QuicStream`: ```js // Create a new bidirectional stream -const stream1 = await session.openStream(); +async function createStreams(session) { + const stream1 = await session.openStream(); -// Create a new unidirectional stream -const stream2 = await session.openStream({ halfOpen: true }); + // Create a new unidirectional stream + const stream2 = await session.openStream({ halfOpen: true }); +} ``` As suggested by the names, a bidirectional stream allows data to be sent on diff --git a/lib/internal/quic/core.js b/lib/internal/quic/core.js index 9f6545fd23ad1d..2ad5f0cb97a5a7 100644 --- a/lib/internal/quic/core.js +++ b/lib/internal/quic/core.js @@ -42,6 +42,7 @@ const { validateQuicEndpointOptions, validateCreateSecureContextOptions, validateQuicSocketConnectOptions, + QuicStreamSharedState, QuicSocketSharedState, QuicSessionSharedState, QLogStream, @@ -1792,8 +1793,7 @@ class QuicSession extends EventEmitter { const stream = this[kInternalState].streams.get(id); if (stream === undefined) return; - - stream.destroy(); + stream[kDestroy](code); } [kStreamReset](id, code) { @@ -1968,6 +1968,8 @@ class QuicSession extends EventEmitter { return; state.destroyed = true; + state.idleTimeout = Boolean(this[kInternalState].state?.idleTimeout); + // Destroy any remaining streams immediately. for (const stream of state.streams.values()) stream.destroy(error); @@ -1982,7 +1984,6 @@ class QuicSession extends EventEmitter { handle.stats[IDX_QUIC_SESSION_STATS_DESTROYED_AT] = process.hrtime.bigint(); state.stats = new BigInt64Array(handle.stats); - state.idleTimeout = this[kInternalState].state.idleTimeout; // Destroy the underlying QuicSession handle handle.destroy(state.closeCode, state.closeFamily); @@ -2530,10 +2531,12 @@ function streamOnPause() { if (!this.destroyed) this[kHandle].readStop(); } - class QuicStream extends Duplex { [kInternalState] = { closed: false, + closePromise: undefined, + closePromiseReject: undefined, + closePromiseResolve: undefined, defaultEncoding: undefined, didRead: false, id: undefined, @@ -2544,6 +2547,7 @@ class QuicStream extends Duplex { dataRateHistogram: undefined, dataSizeHistogram: undefined, dataAckHistogram: undefined, + sharedState: undefined, stats: undefined, }; @@ -2563,7 +2567,7 @@ class QuicStream extends Duplex { allowHalfOpen: true, decodeStrings: true, emitClose: true, - autoDestroy: false, + autoDestroy: true, captureRejections: true, }); const state = this[kInternalState]; @@ -2584,7 +2588,6 @@ class QuicStream extends Duplex { // is still minimally usable before this but any data // written will be buffered until kSetHandle is called. [kSetHandle](handle) { - this[kHandle] = handle; const state = this[kInternalState]; if (handle !== undefined) { handle.onread = onStreamRead; @@ -2594,8 +2597,15 @@ class QuicStream extends Duplex { state.dataRateHistogram = new Histogram(handle.rate); state.dataSizeHistogram = new Histogram(handle.size); state.dataAckHistogram = new Histogram(handle.ack); + state.sharedState = new QuicStreamSharedState(handle.state); state.session[kAddStream](state.id, this); } else { + if (this[kHandle] !== undefined) { + this[kHandle].stats[IDX_QUIC_STREAM_STATS_DESTROYED_AT] = + process.hrtime.bigint(); + state.stats = new BigInt64Array(this[kHandle].stats); + } + state.sharedState = undefined; if (state.dataRateHistogram) state.dataRateHistogram[kDestroyHistogram](); if (state.dataSizeHistogram) @@ -2603,14 +2613,68 @@ class QuicStream extends Duplex { if (state.dataAckHistogram) state.dataAckHistogram[kDestroyHistogram](); } + this[kHandle] = handle; } [kStreamReset](code) { - this[kInternalState].resetCode = code | 0; + // Receiving a reset from the peer indicates that it is no + // longer sending any data, we can safely close the readable + // side of the Duplex here. + this[kInternalState].resetCode = code; this.push(null); this.read(); } + [kClose]() { + const state = this[kInternalState]; + + if (this.destroyed) { + return PromiseReject( + new ERR_INVALID_STATE('QuicStream is already destroyed')); + } + + const promise = deferredClosePromise(state); + if (this.readable) { + this.push(null); + this.read(); + } + + if (this.writable) { + this.end(); + } + + return promise; + } + + close() { + return this[kInternalState].closePromise || this[kClose](); + } + + _destroy(error, callback) { + const state = this[kInternalState]; + const handle = this[kHandle]; + this[kSetHandle](); + if (handle !== undefined) + handle.destroy(); + state.session[kRemoveStream](this); + + if (error && typeof state.closePromiseReject === 'function') + state.closePromiseReject(error); + else if (typeof state.closePromiseResolve === 'function') + state.closePromiseResolve(); + + process.nextTick(() => callback(error)); + } + + [kDestroy](code) { + // TODO(@jasnell): If code is non-zero, and stream is not otherwise + // naturally shutdown, then we should destroy with an error. + + // Put the QuicStream into detached mode before calling destroy + this[kSetHandle](); + this.destroy(); + } + [kHeaders](headers, kind, push_id) { // TODO(@jasnell): Convert the headers into a proper object let name; @@ -2635,42 +2699,6 @@ class QuicStream extends Duplex { process.nextTick(emit.bind(this, name, headers, push_id)); } - [kClose](family, code) { - const state = this[kInternalState]; - // Trigger the abrupt shutdown of the stream. If the stream is - // already no-longer readable or writable, this does nothing. If - // the stream is readable or writable, then the abort event will - // be emitted immediately after triggering the send of the - // RESET_STREAM and STOP_SENDING frames. The stream will no longer - // be readable or writable, but will not be immediately destroyed - // as we need to wait until ngtcp2 recognizes the stream as - // having been closed to be destroyed. - - // Do nothing if we've already been destroyed - if (this.destroyed || state.closed) - return; - - state.closed = true; - - // Trigger scheduling of the RESET_STREAM and STOP_SENDING frames - // as appropriate. Notify ngtcp2 that the stream is to be shutdown. - // Once sent, the stream will be closed and destroyed as soon as - // the shutdown is acknowledged by the peer. - this[kHandle].resetStream(code, family); - - // Close down the readable side of the stream - if (this.readable) { - this.push(null); - this.read(); - } - - // It is important to call shutdown on the handle before shutting - // down the writable side of the stream in order to prevent an - // empty STREAM frame with fin set to be sent to the peer. - if (this.writable) - this.end(); - } - [kAfterAsyncWrite]({ bytes }) { // TODO(@jasnell): Implement this } @@ -2681,6 +2709,7 @@ class QuicStream extends Duplex { const initiated = this.serverInitiated ? 'server' : 'client'; return customInspect(this, { id: this[kInternalState].id, + detached: this.detached, direction, initiated, writableState: this._writableState, @@ -2699,6 +2728,15 @@ class QuicStream extends Duplex { // TODO(@jasnell): Implement this later } + get detached() { + // The QuicStream is detached if it is yet destroyed + // but the underlying handle is undefined. While in + // detached mode, the QuicStream may still have + // data pending in the read queue, but writes will + // not be permitted. + return this[kHandle] === undefined; + } + get serverInitiated() { return !!(this[kInternalState].id & 0b01); } @@ -2740,20 +2778,40 @@ class QuicStream extends Duplex { // called. By calling shutdown, we're telling // the native side that no more data will be // coming so that a fin stream packet can be - // sent. + // sent, allowing any remaining final stream + // frames to be sent if necessary. + // + // When end() is called, we set the writeEnded + // flag so that we can know earlier when there + // is not going to be any more data being written + // but that is only used when end() is called + // with a final chunk to write. _final(cb) { - const handle = this[kHandle]; - if (handle === undefined) { - cb(); + if (!this.detached) { + const state = this[kInternalState]; + if (state.sharedState?.finSent) + return cb(); + const handle = this[kHandle]; + const req = new ShutdownWrap(); + req.oncomplete = () => { + req.handle = undefined; + cb(); + }; + req.handle = handle; + if (handle.shutdown(req) === 1) + return req.oncomplete(); return; } + return cb(); + } - const req = new ShutdownWrap(); - req.oncomplete = () => cb(); - req.handle = handle; - const err = handle.shutdown(req); - if (err === 1) - return cb(); + end(...args) { + if (!this.destroyed) { + if (!this.detached) + this[kInternalState].sharedState.writeEnded = true; + super.end.apply(this, args); + } + return this; } _read(nread) { @@ -2809,11 +2867,6 @@ class QuicStream extends Duplex { this[kUpdateTimer](); this.ownsFd = ownsFd; - // Close the writable side of the stream, but only as far as the writable - // stream implementation is concerned. - this._final = null; - this.end(); - defaultTriggerAsyncIdScope(this[async_id_symbol], QuicStream[kStartFilePipe], this, fd, offset, length); @@ -2840,6 +2893,7 @@ class QuicStream extends Duplex { this.source.close().catch(stream.destroy.bind(stream)); else this.source.releaseFD(); + stream.end(); } static [kOnPipedFileHandleRead]() { @@ -2869,35 +2923,14 @@ class QuicStream extends Duplex { return this[kInternalState].push_id; } - close(code) { - this[kClose](QUIC_ERROR_APPLICATION, code); + _onTimeout() { + // TODO(@jasnell): Implement this } get session() { return this[kInternalState].session; } - _destroy(error, callback) { - const state = this[kInternalState]; - const handle = this[kHandle]; - // Do not use handle after this point as the underlying C++ - // object has been destroyed. Any attempt to use the object - // will segfault and crash the process. - if (handle !== undefined) { - handle.stats[IDX_QUIC_STREAM_STATS_DESTROYED_AT] = - process.hrtime.bigint(); - state.stats = new BigInt64Array(handle.stats); - handle.destroy(); - } - state.session[kRemoveStream](this); - // The destroy callback must be invoked in a nextTick - process.nextTick(() => callback(error)); - } - - _onTimeout() { - // TODO(@jasnell): Implement this - } - get dataRateHistogram() { return this[kInternalState].dataRateHistogram; } diff --git a/lib/internal/quic/util.js b/lib/internal/quic/util.js index 0f61af140140cf..f46a8714e00a20 100644 --- a/lib/internal/quic/util.js +++ b/lib/internal/quic/util.js @@ -86,6 +86,13 @@ const { IDX_QUICSOCKET_STATE_SERVER_BUSY, IDX_QUICSOCKET_STATE_STATELESS_RESET_DISABLED, + IDX_QUICSTREAM_STATE_WRITE_ENDED, + IDX_QUICSTREAM_STATE_READ_STARTED, + IDX_QUICSTREAM_STATE_READ_PAUSED, + IDX_QUICSTREAM_STATE_READ_ENDED, + IDX_QUICSTREAM_STATE_FIN_SENT, + IDX_QUICSTREAM_STATE_FIN_RECEIVED, + IDX_HTTP3_QPACK_MAX_TABLE_CAPACITY, IDX_HTTP3_QPACK_BLOCKED_STREAMS, IDX_HTTP3_MAX_HEADER_LIST_SIZE, @@ -806,6 +813,48 @@ function toggleListeners(state, event, on) { } } +class QuicStreamSharedState { + constructor(state) { + this[kHandle] = Buffer.from(state); + } + + get writeEnded() { + return Boolean(this[kHandle].readUInt8(IDX_QUICSTREAM_STATE_WRITE_ENDED)); + } + + set writeEnded(on) { + this[kHandle].writeUInt8(on ? 1 : 0, IDX_QUICSTREAM_STATE_WRITE_ENDED); + } + + get readStarted() { + return Boolean(this[kHandle].readUInt8(IDX_QUICSTREAM_STATE_READ_STARTED)); + } + + get readPaused() { + return Boolean(this[kHandle].readUInt8(IDX_QUICSTREAM_STATE_READ_PAUSED)); + } + + set readPaused(on) { + this[kHandle].writeUInt8(on ? 1 : 0, IDX_QUICSTREAM_STATE_READ_PAUSED); + } + + get readEnded() { + return Boolean(this[kHandle].readUInt8(IDX_QUICSTREAM_STATE_READ_ENDED)); + } + + set readEnded(on) { + this[kHandle].writeUInt8(on ? 1 : 0, IDX_QUICSTREAM_STATE_READ_ENDED); + } + + get finSent() { + return Boolean(this[kHandle].readUInt8(IDX_QUICSTREAM_STATE_FIN_SENT)); + } + + get finReceived() { + return Boolean(this[kHandle].readUInt8(IDX_QUICSTREAM_STATE_FIN_RECEIVED)); + } +} + class QuicSocketSharedState { constructor(state) { this[kHandle] = Buffer.from(state); @@ -986,6 +1035,7 @@ module.exports = { validateQuicEndpointOptions, validateCreateSecureContextOptions, validateQuicSocketConnectOptions, + QuicStreamSharedState, QuicSocketSharedState, QuicSessionSharedState, QLogStream, diff --git a/src/quic/node_quic.cc b/src/quic/node_quic.cc index a7baf1aa6df559..29473492198d61 100644 --- a/src/quic/node_quic.cc +++ b/src/quic/node_quic.cc @@ -210,6 +210,11 @@ void Initialize(Local target, QUICSOCKET_SHARED_STATE(V) #undef V +#define V(id, _, __) \ + NODE_DEFINE_CONSTANT(constants, IDX_QUICSTREAM_STATE_##id); + QUICSTREAM_SHARED_STATE(V) +#undef V + #define V(name, _, __) \ NODE_DEFINE_CONSTANT(constants, IDX_QUIC_SESSION_STATS_##name); SESSION_STATS(V) diff --git a/src/quic/node_quic_buffer.cc b/src/quic/node_quic_buffer.cc index ebe230271ac846..19514e75f663fe 100644 --- a/src/quic/node_quic_buffer.cc +++ b/src/quic/node_quic_buffer.cc @@ -115,7 +115,6 @@ int QuicBuffer::DoPull( size_t len = 0; size_t numbytes = 0; int status = bob::Status::STATUS_CONTINUE; - // There's no data to read. if (!remaining() || head_ == nullptr) { status = is_ended() ? diff --git a/src/quic/node_quic_default_application.cc b/src/quic/node_quic_default_application.cc index 5af8dc098d6027..5005d3d6919097 100644 --- a/src/quic/node_quic_default_application.cc +++ b/src/quic/node_quic_default_application.cc @@ -39,44 +39,34 @@ int IsEmpty(const ngtcp2_vec* vec, size_t cnt) { } // anonymous namespace DefaultApplication::DefaultApplication( - QuicSession* session) : - QuicApplication(session) {} + QuicSession* session) + : QuicApplication(session) {} bool DefaultApplication::Initialize() { if (needs_init()) { Debug(session(), "Default QUIC Application Initialized"); set_init_done(); } - return needs_init(); + return true; } void DefaultApplication::ScheduleStream(int64_t stream_id) { BaseObjectPtr stream = session()->FindStream(stream_id); - Debug(session(), "Scheduling stream %" PRIu64, stream_id); - if (LIKELY(stream)) + if (LIKELY(stream && !stream->is_destroyed())) { + Debug(session(), "Scheduling stream %" PRIu64, stream_id); stream->Schedule(&stream_queue_); + } } void DefaultApplication::UnscheduleStream(int64_t stream_id) { BaseObjectPtr stream = session()->FindStream(stream_id); - Debug(session(), "Unscheduling stream %" PRIu64, stream_id); - if (LIKELY(stream)) + if (LIKELY(stream)) { + Debug(session(), "Unscheduling stream %" PRIu64, stream_id); stream->Unschedule(); -} - -void DefaultApplication::StreamClose( - int64_t stream_id, - uint64_t app_error_code) { - if (!session()->HasStream(stream_id)) - return; - if (app_error_code == 0) - app_error_code = NGTCP2_APP_NOERROR; - UnscheduleStream(stream_id); - QuicApplication::StreamClose(stream_id, app_error_code); + } } void DefaultApplication::ResumeStream(int64_t stream_id) { - Debug(session(), "Stream %" PRId64 " has data to send", stream_id); ScheduleStream(stream_id); } @@ -93,10 +83,7 @@ bool DefaultApplication::ReceiveStreamData( if (!stream) { // Shutdown the stream explicitly if the session is being closed. if (session()->is_graceful_closing()) { - ngtcp2_conn_shutdown_stream( - session()->connection(), - stream_id, - NGTCP2_ERR_CLOSING); + session()->ShutdownStream(stream_id, NGTCP2_ERR_CLOSING); return true; } @@ -113,6 +100,11 @@ bool DefaultApplication::ReceiveStreamData( } CHECK(stream); + // If the stream ended up being destroyed immediately after + // creation, just skip the data processing and return. + if (UNLIKELY(stream->is_destroyed())) + return true; + stream->ReceiveData(flags, data, datalen, offset); return true; } @@ -141,6 +133,7 @@ int DefaultApplication::GetStreamData(StreamData* stream_data) { case bob::Status::STATUS_END: stream_data->fin = 1; } + stream_data->count = count; if (count > 0) { diff --git a/src/quic/node_quic_default_application.h b/src/quic/node_quic_default_application.h index d2dc1bfd7a5cc3..52c9044c0e3828 100644 --- a/src/quic/node_quic_default_application.h +++ b/src/quic/node_quic_default_application.h @@ -38,7 +38,6 @@ class DefaultApplication final : public QuicApplication { int GetStreamData(StreamData* stream_data) override; void ResumeStream(int64_t stream_id) override; - void StreamClose(int64_t stream_id, uint64_t app_error_code) override; bool ShouldSetFin(const StreamData& stream_data) override; bool StreamCommit(StreamData* stream_data, size_t datalen) override; diff --git a/src/quic/node_quic_http3_application.cc b/src/quic/node_quic_http3_application.cc index 150734bb81c925..e05ad316341c66 100644 --- a/src/quic/node_quic_http3_application.cc +++ b/src/quic/node_quic_http3_application.cc @@ -596,7 +596,6 @@ void Http3Application::StreamClosed( BaseObjectPtr stream = session()->FindStream(stream_id); if (stream) stream->ReceiveData(1, nullptr, 0, 0); - session()->listener()->OnStreamClose(stream_id, app_error_code); } BaseObjectPtr Http3Application::FindOrCreateStream( diff --git a/src/quic/node_quic_session-inl.h b/src/quic/node_quic_session-inl.h index f960e03cc98d6f..4ab084463b37e5 100644 --- a/src/quic/node_quic_session-inl.h +++ b/src/quic/node_quic_session-inl.h @@ -207,32 +207,6 @@ void QuicApplication::set_stream_fin(int64_t stream_id) { stream->set_fin_sent(); } -ssize_t QuicApplication::WriteVStream( - QuicPathStorage* path, - uint8_t* buf, - ssize_t* ndatalen, - const StreamData& stream_data) { - CHECK_LE(stream_data.count, kMaxVectorCount); - - uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_NONE; - if (stream_data.remaining > 0) - flags |= NGTCP2_WRITE_STREAM_FLAG_MORE; - if (stream_data.fin) - flags |= NGTCP2_WRITE_STREAM_FLAG_FIN; - - return ngtcp2_conn_writev_stream( - session()->connection(), - &path->path, - buf, - session()->max_packet_length(), - ndatalen, - flags, - stream_data.id, - stream_data.buf, - stream_data.count, - uv_hrtime()); -} - std::unique_ptr QuicApplication::CreateStreamDataPacket() { return QuicPacket::Create( "stream data", @@ -243,45 +217,6 @@ Environment* QuicApplication::env() const { return session()->env(); } -// Every QUIC session will have multiple CIDs associated with it. -void QuicSession::AssociateCID(const QuicCID& cid) { - socket()->AssociateCID(cid, scid_); -} - -void QuicSession::DisassociateCID(const QuicCID& cid) { - if (is_server()) - socket()->DisassociateCID(cid); -} - -void QuicSession::ExtendMaxStreamData(int64_t stream_id, uint64_t max_data) { - Debug(this, - "Extending max stream %" PRId64 " data to %" PRIu64, - stream_id, max_data); - application_->ExtendMaxStreamData(stream_id, max_data); -} - -void QuicSession::ExtendMaxStreamsRemoteUni(uint64_t max_streams) { - Debug(this, "Extend remote max unidirectional streams: %" PRIu64, - max_streams); - application_->ExtendMaxStreamsRemoteUni(max_streams); -} - -void QuicSession::ExtendMaxStreamsRemoteBidi(uint64_t max_streams) { - Debug(this, "Extend remote max bidirectional streams: %" PRIu64, - max_streams); - application_->ExtendMaxStreamsRemoteBidi(max_streams); -} - -void QuicSession::ExtendMaxStreamsUni(uint64_t max_streams) { - Debug(this, "Setting max unidirectional streams to %" PRIu64, max_streams); - state_->max_streams_uni = max_streams; -} - -void QuicSession::ExtendMaxStreamsBidi(uint64_t max_streams) { - Debug(this, "Setting max bidirectional streams to %" PRIu64, max_streams); - state_->max_streams_bidi = max_streams; -} - // Extends the stream-level flow control by the given number of bytes. void QuicSession::ExtendStreamOffset(int64_t stream_id, size_t amount) { Debug(this, "Extending max stream %" PRId64 " offset by %" PRId64 " bytes", @@ -311,32 +246,19 @@ uint32_t QuicSession::negotiated_version() const { return ngtcp2_conn_get_negotiated_version(connection()); } -// The HandshakeCompleted function is called by ngtcp2 once it -// determines that the TLS Handshake is done. The only thing we -// need to do at this point is let the javascript side know. -void QuicSession::HandshakeCompleted() { - RemoteTransportParamsDebug transport_params(this); - Debug(this, "Handshake is completed. %s", transport_params); - RecordTimestamp(&QuicSessionStats::handshake_completed_at); - if (is_server()) HandshakeConfirmed(); - listener()->OnHandshakeCompleted(); -} - -void QuicSession::HandshakeConfirmed() { - Debug(this, "Handshake is confirmed"); - RecordTimestamp(&QuicSessionStats::handshake_confirmed_at); - state_->handshake_confirmed = 1; -} - bool QuicSession::is_handshake_completed() const { DCHECK(!is_destroyed()); return ngtcp2_conn_get_handshake_completed(connection()); } -void QuicSession::InitApplication() { - Debug(this, "Initializing application handler for ALPN %s", - alpn().c_str() + 1); - application_->Initialize(); +void QuicSession::ShutdownStream(int64_t stream_id, uint64_t code) { + if (is_in_closing_period() || + is_in_draining_period() || + is_silent_closing()) { + return; // Nothing to do because we can't send any frames. + } + SendSessionScope send_scope(this); + ngtcp2_conn_shutdown_stream(connection(), stream_id, 0); } // When a QuicSession hits the idle timeout, it is to be silently and @@ -354,34 +276,10 @@ void QuicSession::OnIdleTimeout() { } } -// Captures the error code and family information from a received -// connection close frame. -void QuicSession::GetConnectionCloseInfo() { - ngtcp2_connection_close_error_code close_code; - ngtcp2_conn_get_connection_close_error_code(connection(), &close_code); - set_last_error(QuicError(close_code)); -} - -// Removes the given connection id from the QuicSession. -void QuicSession::RemoveConnectionID(const QuicCID& cid) { - if (!is_destroyed()) - DisassociateCID(cid); -} - QuicCID QuicSession::dcid() const { return QuicCID(ngtcp2_conn_get_dcid(connection())); } -// The retransmit timer allows us to trigger retransmission -// of packets in case they are considered lost. The exact amount -// of time is determined internally by ngtcp2 according to the -// guidelines established by the QUIC spec but we use a libuv -// timer to actually monitor. Here we take the calculated timeout -// and extend out the libuv timer. -void QuicSession::UpdateRetransmitTimer(uint64_t timeout) { - retransmit_.Update(timeout, timeout); -} - void QuicSession::CheckAllocatedSize(size_t previous_size) const { CHECK_GE(current_ngtcp2_memory_, previous_size); } @@ -460,6 +358,14 @@ void QuicSession::set_connection_id_strategy(ConnectionIDStrategy strategy) { connection_id_strategy_ = strategy; } +bool QuicSession::is_unable_to_send_packets() { + return NgCallbackScope::InNgCallbackScope(this) || + is_destroyed() || + is_in_draining_period() || + (is_server() && is_in_closing_period()) || + socket() == nullptr; +} + void QuicSession::set_preferred_address_strategy( PreferredAddressStrategy strategy) { preferred_address_strategy_ = strategy; @@ -502,11 +408,6 @@ bool QuicSession::SendPacket( return SendPacket(std::move(packet)); } -void QuicSession::set_local_address(const ngtcp2_addr* addr) { - DCHECK(!is_destroyed()); - ngtcp2_conn_set_local_addr(connection(), addr); -} - // Set the transport parameters received from the remote peer void QuicSession::set_remote_transport_params() { DCHECK(!is_destroyed()); @@ -514,48 +415,6 @@ void QuicSession::set_remote_transport_params() { set_transport_params_set(); } -void QuicSession::StopIdleTimer() { - idle_.Close(); -} - -void QuicSession::StopRetransmitTimer() { - retransmit_.Close(); -} - -// Called by the OnVersionNegotiation callback when a version -// negotiation frame has been received by the client. The sv -// parameter is an array of versions supported by the remote peer. -void QuicSession::VersionNegotiation(const uint32_t* sv, size_t nsv) { - CHECK(!is_server()); - if (!is_destroyed()) - listener()->OnVersionNegotiation(NGTCP2_PROTO_VER, sv, nsv); -} - -// Every QUIC session has a remote address and local address. -// Those endpoints can change through the lifetime of a connection, -// so whenever a packet is successfully processed, or when a -// response is to be sent, we have to keep track of the path -// and update as we go. -void QuicSession::UpdateEndpoint(const ngtcp2_path& path) { - remote_address_.Update(path.remote.addr, path.remote.addrlen); - local_address_.Update(path.local.addr, path.local.addrlen); - - // If the updated remote address is IPv6, set the flow label - if (remote_address_.family() == AF_INET6) { - // TODO(@jasnell): Currently, this reuses the session reset secret. - // That may or may not be a good idea, we need to verify and may - // need to have a distinct secret for flow labels. - uint32_t flow_label = - GenerateFlowLabel( - local_address_, - remote_address_, - scid_, - socket()->session_reset_secret(), - NGTCP2_STATELESS_RESET_TOKENLEN); - remote_address_.set_flow_label(flow_label); - } -} - // Submits information headers only if the selected application // supports headers. bool QuicSession::SubmitInformation( diff --git a/src/quic/node_quic_session.cc b/src/quic/node_quic_session.cc index 212bf3dae988d1..1da8f7bfaab6bd 100644 --- a/src/quic/node_quic_session.cc +++ b/src/quic/node_quic_session.cc @@ -59,6 +59,14 @@ using v8::Value; namespace quic { +typedef ssize_t(*ngtcp2_close_fn)( + ngtcp2_conn* conn, + ngtcp2_path* path, + uint8_t* dest, + size_t destlen, + uint64_t error_code, + ngtcp2_tstamp ts); + namespace { void SetConfig(QuicState* quic_state, int idx, uint64_t* val) { AliasedFloat64Array& buffer = quic_state->quicsessionconfig_buffer; @@ -91,6 +99,12 @@ void CopyPreferredAddress( *port = SocketAddress::GetPort(addr); } +ngtcp2_close_fn SelectCloseFn(uint32_t family) { + return family == QUIC_ERROR_APPLICATION ? + ngtcp2_conn_write_application_close : + ngtcp2_conn_write_connection_close; +} + } // namespace std::string QuicSession::RemoteTransportParamsDebug::ToString() const { @@ -1000,11 +1014,6 @@ bool QuicCryptoContext::OnSecrets( const uint8_t* tx_secret, size_t secretlen) { - auto maybe_init_app = OnScopeLeave([&]() { - if (level == NGTCP2_CRYPTO_LEVEL_APP) - session()->InitApplication(); - }); - Debug(session(), "Received secrets for %s crypto level", crypto_level_name(level)); @@ -1012,8 +1021,11 @@ bool QuicCryptoContext::OnSecrets( if (!SetSecrets(level, rx_secret, tx_secret, secretlen)) return false; - if (level == NGTCP2_CRYPTO_LEVEL_APP) + if (level == NGTCP2_CRYPTO_LEVEL_APP) { session_->set_remote_transport_params(); + if (!session()->InitApplication()) + return false; + } return true; } @@ -1289,15 +1301,37 @@ bool QuicApplication::SendPendingData() { return true; } +ssize_t QuicApplication::WriteVStream( + QuicPathStorage* path, + uint8_t* buf, + ssize_t* ndatalen, + const StreamData& stream_data) { + CHECK_LE(stream_data.count, kMaxVectorCount); + + uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_NONE; + if (stream_data.remaining > 0) + flags |= NGTCP2_WRITE_STREAM_FLAG_MORE; + if (stream_data.fin) + flags |= NGTCP2_WRITE_STREAM_FLAG_FIN; + + return ngtcp2_conn_writev_stream( + session()->connection(), + &path->path, + buf, + session()->max_packet_length(), + ndatalen, + flags, + stream_data.id, + stream_data.buf, + stream_data.count, + uv_hrtime()); +} + void QuicApplication::MaybeSetFin(const StreamData& stream_data) { if (ShouldSetFin(stream_data)) set_stream_fin(stream_data.id); } -void QuicApplication::StreamOpen(int64_t stream_id) { - Debug(session(), "QUIC Stream %" PRId64 " is open", stream_id); -} - void QuicApplication::StreamHeaders( int64_t stream_id, int kind, @@ -1309,7 +1343,14 @@ void QuicApplication::StreamHeaders( void QuicApplication::StreamClose( int64_t stream_id, uint64_t app_error_code) { - session()->listener()->OnStreamClose(stream_id, app_error_code); + BaseObjectPtr stream = session()->FindStream(stream_id); + if (stream) { + CHECK(!stream->is_destroyed()); // Should not be possible + stream->set_destroyed(); + stream->CancelPendingWrites(); + session()->RemoveStream(stream_id); + session()->listener()->OnStreamClose(stream_id, app_error_code); + } } void QuicApplication::StreamReset( @@ -1423,7 +1464,7 @@ QuicSession::QuicSession( alpn_(alpn), hostname_(hostname), idle_(socket->env(), [this]() { OnIdleTimeout(); }), - retransmit_(socket->env(), [this]() { MaybeTimeout(); }), + retransmit_(socket->env(), [this]() { OnRetransmitTimeout(); }), dcid_(dcid), state_(env()->isolate()), quic_state_(socket->quic_state()) { @@ -1452,7 +1493,7 @@ QuicSession::QuicSession( } QuicSession::~QuicSession() { - CHECK(!Ngtcp2CallbackScope::InNgtcp2CallbackScope(this)); + CHECK(!NgCallbackScope::InNgCallbackScope(this)); // The next write should be the final one if (qlog_stream_) @@ -1466,8 +1507,8 @@ QuicSession::~QuicSession() { // In a clean shutdown, using Close(), these will have already been // stopped, but if Close() was not called and we're being destroyed // in GC, for instance, we need to make sure they get stopped here. - StopIdleTimer(); - StopRetransmitTimer(); + idle_.Stop(); + retransmit_.Stop(); DebugStats(); } @@ -1532,20 +1573,15 @@ void QuicSession::AckedStreamDataOffset( int64_t stream_id, uint64_t offset, uint64_t datalen) { - // It is possible for the QuicSession to have been destroyed but not yet - // deconstructed. In such cases, we want to ignore the callback as there - // is nothing to do but wait for further cleanup to happen. - if (LIKELY(!is_destroyed())) { - Debug(this, - "Received acknowledgement for %" PRIu64 - " bytes of stream %" PRId64 " data", - datalen, stream_id); + Debug(this, + "Received acknowledgement for %" PRIu64 + " bytes of stream %" PRId64 " data", + datalen, stream_id); - application_->AcknowledgeStreamData( - stream_id, - offset, - static_cast(datalen)); - } + application_->AcknowledgeStreamData( + stream_id, + offset, + static_cast(datalen)); } // Attaches the session to the given QuicSocket. The complexity @@ -1704,8 +1740,8 @@ void QuicSession::Destroy() { CHECK(streams_.empty()); // Stop and free the idle and retransmission timers if they are active. - StopIdleTimer(); - StopRetransmitTimer(); + idle_.Stop(); + retransmit_.Stop(); // The QuicSession instances are kept alive using // BaseObjectPtr. The only persistent BaseObjectPtr @@ -1721,18 +1757,15 @@ void QuicSession::Destroy() { // Generates and associates a new connection ID for this QuicSession. // ngtcp2 will call this multiple times at the start of a new connection // in order to build a pool of available CIDs. -bool QuicSession::GetNewConnectionID( +void QuicSession::GetNewConnectionID( ngtcp2_cid* cid, uint8_t* token, size_t cidlen) { - if (is_destroyed()) - return false; CHECK_NOT_NULL(connection_id_strategy_); connection_id_strategy_(this, cid, cidlen); QuicCID cid_(cid); StatelessResetToken(token, socket()->session_reset_secret(), cid_); - AssociateCID(cid_); - return true; + socket()->AssociateCID(cid_, scid_); } void QuicSession::HandleError() { @@ -1748,10 +1781,10 @@ void QuicSession::HandleError() { UpdateClosingTimer(); } -// The the retransmit libuv timer fires, it will call MaybeTimeout, +// The the retransmit libuv timer fires, it will call OnRetransmitTimeout, // which determines whether or not we need to retransmit data to // to packet loss or ack delay. -void QuicSession::MaybeTimeout() { +void QuicSession::OnRetransmitTimeout() { if (is_destroyed()) return; uint64_t now = uv_hrtime(); @@ -1787,10 +1820,7 @@ bool QuicSession::OpenUnidirectionalStream(int64_t* stream_id) { DCHECK(!is_destroyed()); DCHECK(!is_closing()); DCHECK(!is_graceful_closing()); - if (ngtcp2_conn_open_uni_stream(connection(), stream_id, nullptr)) - return false; - ngtcp2_conn_shutdown_stream_read(connection(), *stream_id, 0); - return true; + return ngtcp2_conn_open_uni_stream(connection(), stream_id, nullptr) == 0; } // When ngtcp2 receives a successfull response to a PATH_CHALLENGE, @@ -1825,7 +1855,7 @@ void QuicSession::PathValidation( // is called while processing an ngtcp2 callback, or if the // closing or draining period has started, this is a non-op. void QuicSession::Ping() { - if (Ngtcp2CallbackScope::InNgtcp2CallbackScope(this) || + if (NgCallbackScope::InNgCallbackScope(this) || is_destroyed() || is_closing() || is_in_closing_period() || @@ -1848,16 +1878,14 @@ bool QuicSession::Receive( const SocketAddress& local_addr, const SocketAddress& remote_addr, unsigned int flags) { - if (is_destroyed()) { - Debug(this, "Ignoring packet because session is destroyed"); - return false; - } + + CHECK(!is_destroyed()); Debug(this, "Receiving QUIC packet"); IncrementStat(&QuicSessionStats::bytes_received, nread); if (is_in_closing_period() && is_server()) { - Debug(this, "QUIC packet received while in closing period"); + Debug(this, "Packet received while in closing period"); IncrementConnectionCloseAttempts(); // For server QuicSession instances, we serialize the connection close // packet once but may sent it multiple times. If the client keeps @@ -1868,7 +1896,7 @@ bool QuicSession::Receive( // close frame sent with every one we send. if (UNLIKELY(ShouldAttemptConnectionClose() && !SendConnectionClose())) { - Debug(this, "Failure trying to send another connection close"); + Debug(this, "Failure sending another connection close"); return false; } } @@ -1881,7 +1909,6 @@ bool QuicSession::Receive( auto update_stats = OnScopeLeave([&](){ UpdateDataStats(); }); - Debug(this, "Processing received packet"); HandleScope handle_scope(env()->isolate()); InternalCallbackScope callback_scope(this); remote_address_ = remote_addr; @@ -1906,9 +1933,10 @@ bool QuicSession::Receive( return true; } - Debug(this, "Sending pending data after processing packet"); + if (!is_destroyed()) + UpdateIdleTimer(); + SendPendingData(); - UpdateIdleTimer(); UpdateRecoveryStats(); Debug(this, "Successfully processed received packet"); return true; @@ -1921,20 +1949,14 @@ bool QuicSession::ReceivePacket( ngtcp2_path* path, const uint8_t* data, ssize_t nread) { - DCHECK(!Ngtcp2CallbackScope::InNgtcp2CallbackScope(this)); - - // If the QuicSession has been destroyed, we're not going - // to process any more packets for it. - if (is_destroyed()) - return true; + CHECK(!is_destroyed()); uint64_t now = uv_hrtime(); SetStat(&QuicSessionStats::received_at, now); int err = ngtcp2_conn_read_pkt(connection(), path, data, nread, now); if (err < 0) { switch (err) { - // In either of the following two cases, the caller will - // handle the next steps... + case NGTCP2_ERR_CALLBACK_FAILURE: case NGTCP2_ERR_DRAINING: case NGTCP2_ERR_RECV_VERSION_NEGOTIATION: break; @@ -1953,15 +1975,15 @@ bool QuicSession::ReceivePacket( } } - if (is_destroyed()) { - Debug(this, "Session was destroyed while processing the received packet"); - // If the QuicSession has been destroyed but it is not - // in the closing period, a CONNECTION_CLOSE has not yet - // been sent to the peer. Let's attempt to send one. This - // will have the effect of setting the idle timer to the - // closing/draining period, after which the QuicSession - // will be destroyed. - return is_in_closing_period() ? true : SendConnectionClose(); + // If the QuicSession has been destroyed but it is not + // in the closing period, a CONNECTION_CLOSE has not yet + // been sent to the peer. Let's attempt to send one. This + // will have the effect of setting the idle timer to the + // closing/draining period, after which the QuicSession + // will be destroyed. + if (is_destroyed() && !is_in_closing_period()) { + Debug(this, "Session was destroyed while processing the packet"); + return SendConnectionClose(); } return true; @@ -1976,31 +1998,12 @@ bool QuicSession::ReceiveStreamData( const uint8_t* data, size_t datalen, uint64_t offset) { - auto leave = OnScopeLeave([&]() { - // This extends the flow control window for the entire session - // but not for the individual Stream. Stream flow control is - // only expanded as data is read on the JavaScript side. - // TODO(jasnell): The strategy for extending the flow control - // window is going to be revisited soon. We don't need to - // extend on every chunk of data. + auto leave = OnScopeLeave([=]() { + // Unconditionally extend the flow control window for the entire + // session but not for the individual Stream. ExtendOffset(datalen); }); - // QUIC does not permit zero-length stream packets if - // fin is not set. ngtcp2 prevents these from coming - // through but just in case of regression in that impl, - // let's double check and simply ignore such packets - // so we do not commit any resources. - if (UNLIKELY(!(flags & NGTCP2_STREAM_DATA_FLAG_FIN) && datalen == 0)) - return true; - - if (is_destroyed()) - return false; - - HandleScope scope(env()->isolate()); - Context::Scope context_scope(env()->context()); - - // From here, we defer to the QuicApplication specific processing logic return application_->ReceiveStreamData( flags, stream_id, @@ -2053,7 +2056,10 @@ void QuicSession::RemoveStream(int64_t stream_id) { // except in very specific conditions, none of which apply // once we've gotten this far. We need to manually extend when // a remote peer initiated stream is removed. - if (!ngtcp2_conn_is_local_stream(connection_.get(), stream_id)) { + if (!is_in_draining_period() && + !is_in_closing_period() && + !is_silent_closing() && + !ngtcp2_conn_is_local_stream(connection_.get(), stream_id)) { if (ngtcp2_is_bidi_stream(stream_id)) ngtcp2_conn_extend_max_streams_bidi(connection_.get(), 1); else @@ -2063,13 +2069,6 @@ void QuicSession::RemoveStream(int64_t stream_id) { // This will have the side effect of destroying the QuicStream // instance. streams_.erase(stream_id); - // Ensure that the stream state is closed and discarded by ngtcp2 - // Be sure to call this after removing the stream from the map - // above so that when ngtcp2 closes the stream, the callback does - // not attempt to loop back around and destroy the already removed - // QuicStream instance. Typically, the stream is already going to - // be closed by this point. - ngtcp2_conn_shutdown_stream(connection(), stream_id, NGTCP2_NO_ERROR); } // The retransmit timer allows us to trigger retransmission @@ -2086,11 +2085,101 @@ void QuicSession::ScheduleRetransmit() { UpdateRetransmitTimer(interval); } +bool QuicSession::InitApplication() { + Debug(this, "Initializing application handler for ALPN %s", + alpn().c_str() + 1); + return application_->Initialize(); +} + +// Captures the error code and family information from a received +// connection close frame. +void QuicSession::GetConnectionCloseInfo() { + ngtcp2_connection_close_error_code close_code; + ngtcp2_conn_get_connection_close_error_code(connection(), &close_code); + set_last_error(QuicError(close_code)); +} + +// The HandshakeCompleted function is called by ngtcp2 once it +// determines that the TLS Handshake is done. The only thing we +// need to do at this point is let the javascript side know. +void QuicSession::HandshakeCompleted() { + RemoteTransportParamsDebug transport_params(this); + Debug(this, "Handshake is completed. %s", transport_params); + RecordTimestamp(&QuicSessionStats::handshake_completed_at); + if (is_server()) HandshakeConfirmed(); + listener()->OnHandshakeCompleted(); +} + +void QuicSession::HandshakeConfirmed() { + Debug(this, "Handshake is confirmed"); + RecordTimestamp(&QuicSessionStats::handshake_confirmed_at); + state_->handshake_confirmed = 1; +} + +// Every QUIC session has a remote address and local address. +// Those endpoints can change through the lifetime of a connection, +// so whenever a packet is successfully processed, or when a +// response is to be sent, we have to keep track of the path +// and update as we go. +void QuicSession::UpdateEndpoint(const ngtcp2_path& path) { + remote_address_.Update(path.remote.addr, path.remote.addrlen); + local_address_.Update(path.local.addr, path.local.addrlen); + + // If the updated remote address is IPv6, set the flow label + if (remote_address_.family() == AF_INET6) { + // TODO(@jasnell): Currently, this reuses the session reset secret. + // That may or may not be a good idea, we need to verify and may + // need to have a distinct secret for flow labels. + uint32_t flow_label = + GenerateFlowLabel( + local_address_, + remote_address_, + scid_, + socket()->session_reset_secret(), + NGTCP2_STATELESS_RESET_TOKENLEN); + remote_address_.set_flow_label(flow_label); + } +} + +// Called by the OnVersionNegotiation callback when a version +// negotiation frame has been received by the client. The sv +// parameter is an array of versions supported by the remote peer. +void QuicSession::VersionNegotiation(const uint32_t* sv, size_t nsv) { + CHECK(!is_server()); + listener()->OnVersionNegotiation(NGTCP2_PROTO_VER, sv, nsv); +} + +// The retransmit timer allows us to trigger retransmission +// of packets in case they are considered lost. The exact amount +// of time is determined internally by ngtcp2 according to the +// guidelines established by the QUIC spec but we use a libuv +// timer to actually monitor. Here we take the calculated timeout +// and extend out the libuv timer. +void QuicSession::UpdateRetransmitTimer(uint64_t timeout) { + retransmit_.Update(timeout, timeout); +} + +void QuicSession::IncrementConnectionCloseAttempts() { + if (connection_close_attempts_ < kMaxSizeT) + connection_close_attempts_++; +} + +bool QuicSession::ShouldAttemptConnectionClose() { + if (connection_close_attempts_ == connection_close_limit_) { + if (connection_close_limit_ * 2 <= kMaxSizeT) + connection_close_limit_ *= 2; + else + connection_close_limit_ = kMaxSizeT; + return true; + } + return false; +} + // Transmits either a protocol or application connection // close to the peer. The choice of which is send is // based on the current value of last_error_. bool QuicSession::SendConnectionClose() { - CHECK(!Ngtcp2CallbackScope::InNgtcp2CallbackScope(this)); + CHECK(!NgCallbackScope::InNgCallbackScope(this)); // Do not send any frames at all if we're in the draining period // or in the middle of a silent close @@ -2103,10 +2192,9 @@ bool QuicSession::SendConnectionClose() { // it multiple times; whereas for clients, we will serialize it // once and send once only. QuicError error = last_error(); - Debug(this, "Connection Close code: %" PRIu64 " (family: %s)", + Debug(this, "Sending connection close with code: %" PRIu64 " (family: %s)", error.code, error.family_name()); - Debug(this, "Setting the connection/draining period timer"); UpdateClosingTimer(); // If initial keys have not yet been installed, use the alternative @@ -2215,20 +2303,10 @@ bool QuicSession::SendPacket(std::unique_ptr packet) { // Sends any pending handshake or session packet data. void QuicSession::SendPendingData() { - // Do not proceed if: - // * We are in the ngtcp2 callback scope - // * The QuicSession has been destroyed - // * The QuicSession is in the draining period - // * The QuicSession is a server in the closing period - // * The QuicSession is not currently associated with a QuicSocket - if (Ngtcp2CallbackScope::InNgtcp2CallbackScope(this) || - is_destroyed() || - is_in_draining_period() || - (is_server() && is_in_closing_period()) || - socket() == nullptr) { + if (is_unable_to_send_packets()) return; - } + Debug(this, "Sending pending data"); if (!application_->SendPendingData()) { Debug(this, "Error sending QUIC application data"); HandleError(); @@ -2345,9 +2423,6 @@ bool QuicSession::StartClosingPeriod() { // Called by ngtcp2 when a stream has been closed. If the stream does // not exist, the close is ignored. void QuicSession::StreamClose(int64_t stream_id, uint64_t app_error_code) { - if (is_destroyed()) - return; - Debug(this, "Closing stream %" PRId64 " with code %" PRIu64, stream_id, app_error_code); @@ -2355,22 +2430,6 @@ void QuicSession::StreamClose(int64_t stream_id, uint64_t app_error_code) { application_->StreamClose(stream_id, app_error_code); } -// Called by ngtcp2 when a stream has been opened. All we do is log -// the activity here. We do not want to actually commit any resources -// until data is received for the stream. This allows us to prevent -// a stream commitment attack. The only exception is shutting the -// stream down explicitly if we are in a graceful close period. -void QuicSession::StreamOpen(int64_t stream_id) { - if (is_graceful_closing()) { - ngtcp2_conn_shutdown_stream( - connection(), - stream_id, - NGTCP2_ERR_CLOSING); - } - Debug(this, "Stream %" PRId64 " opened", stream_id); - return application_->StreamOpen(stream_id); -} - // Called when the QuicSession has received a RESET_STREAM frame from the // peer, indicating that it will no longer send additional frames for the // stream. If the stream is not yet known, reset is ignored. If the stream @@ -2393,9 +2452,6 @@ void QuicSession::StreamReset( int64_t stream_id, uint64_t final_size, uint64_t app_error_code) { - if (is_destroyed()) - return; - Debug(this, "Reset stream %" PRId64 " with code %" PRIu64 " and final size %" PRIu64, @@ -2465,7 +2521,7 @@ void QuicSession::UpdateClosingTimer() { // serialized at this point as well. However, WritePackets does not // serialize stream data that is being sent initially. bool QuicSession::WritePackets(const char* diagnostic_label) { - CHECK(!Ngtcp2CallbackScope::InNgtcp2CallbackScope(this)); + CHECK(!NgCallbackScope::InNgCallbackScope(this)); // During either the draining or closing period, // we are not permitted to send any additional packets. @@ -2533,6 +2589,33 @@ void QuicSession::MemoryInfo(MemoryTracker* tracker) const { StatsBase::StatsMemoryInfo(tracker); } +void QuicSession::ExtendMaxStreamsBidi(uint64_t max_streams) { + state_->max_streams_bidi = max_streams; +} + +void QuicSession::ExtendMaxStreamsUni(uint64_t max_streams) { + state_->max_streams_uni = max_streams; +} + +void QuicSession::ExtendMaxStreamsRemoteUni(uint64_t max_streams) { + Debug(this, "Extend remote max unidirectional streams: %" PRIu64, + max_streams); + application_->ExtendMaxStreamsRemoteUni(max_streams); +} + +void QuicSession::ExtendMaxStreamsRemoteBidi(uint64_t max_streams) { + Debug(this, "Extend remote max bidirectional streams: %" PRIu64, + max_streams); + application_->ExtendMaxStreamsRemoteBidi(max_streams); +} + +void QuicSession::ExtendMaxStreamData(int64_t stream_id, uint64_t max_data) { + Debug(this, + "Extending max stream %" PRId64 " data to %" PRIu64, + stream_id, max_data); + application_->ExtendMaxStreamData(stream_id, max_data); +} + // Static function to create a new server QuicSession instance BaseObjectPtr QuicSession::CreateServer( QuicSocket* socket, @@ -2817,11 +2900,16 @@ int QuicSession::OnReceiveCryptoData( size_t datalen, void* user_data) { QuicSession* session = static_cast(user_data); + if (UNLIKELY(session->is_destroyed())) return NGTCP2_ERR_CALLBACK_FAILURE; - QuicSession::Ngtcp2CallbackScope callback_scope(session); + + QuicSession::NgCallbackScope callback_scope(session); return session->crypto_context()->Receive( - crypto_level, offset, data, datalen); + crypto_level, + offset, + data, + datalen); } // Called by ngtcp2 for both client and server connections @@ -2832,9 +2920,11 @@ int QuicSession::OnExtendMaxStreamsBidi( uint64_t max_streams, void* user_data) { QuicSession* session = static_cast(user_data); + if (UNLIKELY(session->is_destroyed())) return NGTCP2_ERR_CALLBACK_FAILURE; - QuicSession::Ngtcp2CallbackScope callback_scope(session); + + QuicSession::NgCallbackScope callback_scope(session); session->ExtendMaxStreamsBidi(max_streams); return 0; } @@ -2847,9 +2937,11 @@ int QuicSession::OnExtendMaxStreamsUni( uint64_t max_streams, void* user_data) { QuicSession* session = static_cast(user_data); + if (UNLIKELY(session->is_destroyed())) return NGTCP2_ERR_CALLBACK_FAILURE; - QuicSession::Ngtcp2CallbackScope callback_scope(session); + + QuicSession::NgCallbackScope callback_scope(session); session->ExtendMaxStreamsUni(max_streams); return 0; } @@ -2864,9 +2956,11 @@ int QuicSession::OnExtendMaxStreamsRemoteUni( uint64_t max_streams, void* user_data) { QuicSession* session = static_cast(user_data); + if (UNLIKELY(session->is_destroyed())) return NGTCP2_ERR_CALLBACK_FAILURE; - QuicSession::Ngtcp2CallbackScope callback_scope(session); + + QuicSession::NgCallbackScope callback_scope(session); session->ExtendMaxStreamsRemoteUni(max_streams); return 0; } @@ -2881,9 +2975,11 @@ int QuicSession::OnExtendMaxStreamsRemoteBidi( uint64_t max_streams, void* user_data) { QuicSession* session = static_cast(user_data); + if (UNLIKELY(session->is_destroyed())) return NGTCP2_ERR_CALLBACK_FAILURE; - QuicSession::Ngtcp2CallbackScope callback_scope(session); + + QuicSession::NgCallbackScope callback_scope(session); session->ExtendMaxStreamsRemoteUni(max_streams); return 0; } @@ -2901,9 +2997,11 @@ int QuicSession::OnExtendMaxStreamData( void* user_data, void* stream_user_data) { QuicSession* session = static_cast(user_data); + if (UNLIKELY(session->is_destroyed())) return NGTCP2_ERR_CALLBACK_FAILURE; - QuicSession::Ngtcp2CallbackScope callback_scope(session); + + QuicSession::NgCallbackScope callback_scope(session); session->ExtendMaxStreamData(stream_id, max_data); return 0; } @@ -2916,15 +3014,15 @@ int QuicSession::OnConnectionIDStatus( const uint8_t* token, void* user_data) { QuicSession* session = static_cast(user_data); + if (UNLIKELY(session->is_destroyed())) return NGTCP2_ERR_CALLBACK_FAILURE; - QuicCID qcid(cid); - Debug(session, "Updating connection ID %s (has reset token? %s)", - qcid, - token == nullptr ? "No" : "Yes"); - if (token != nullptr) + if (token != nullptr) { + QuicCID qcid(cid); + Debug(session, "Updating connection ID %s with reset token", qcid); session->UpdateConnectionID(type, qcid, StatelessResetToken(token)); + } return 0; } @@ -2937,10 +3035,13 @@ int QuicSession::OnConnectionIDStatus( int QuicSession::OnHandshakeCompleted( ngtcp2_conn* conn, void* user_data) { + QuicSession* session = static_cast(user_data); + if (UNLIKELY(session->is_destroyed())) return NGTCP2_ERR_CALLBACK_FAILURE; - QuicSession::Ngtcp2CallbackScope callback_scope(session); + + QuicSession::NgCallbackScope callback_scope(session); session->HandshakeCompleted(); return 0; } @@ -2951,9 +3052,11 @@ int QuicSession::OnHandshakeConfirmed( ngtcp2_conn* conn, void* user_data) { QuicSession* session = static_cast(user_data); + if (UNLIKELY(session->is_destroyed())) return NGTCP2_ERR_CALLBACK_FAILURE; - QuicSession::Ngtcp2CallbackScope callback_scope(session); + + QuicSession::NgCallbackScope callback_scope(session); session->HandshakeConfirmed(); return 0; } @@ -2974,11 +3077,17 @@ int QuicSession::OnReceiveStreamData( void* user_data, void* stream_user_data) { QuicSession* session = static_cast(user_data); + if (UNLIKELY(session->is_destroyed())) return NGTCP2_ERR_CALLBACK_FAILURE; - QuicSession::Ngtcp2CallbackScope callback_scope(session); - return session->ReceiveStreamData(flags, stream_id, data, datalen, offset) ? - 0 : NGTCP2_ERR_CALLBACK_FAILURE; + + QuicSession::NgCallbackScope callback_scope(session); + return session->ReceiveStreamData( + flags, + stream_id, + data, + datalen, + offset) ? 0 : NGTCP2_ERR_CALLBACK_FAILURE; } // Called by ngtcp2 when a new stream has been opened @@ -2987,9 +3096,14 @@ int QuicSession::OnStreamOpen( int64_t stream_id, void* user_data) { QuicSession* session = static_cast(user_data); + if (UNLIKELY(session->is_destroyed())) return NGTCP2_ERR_CALLBACK_FAILURE; - session->StreamOpen(stream_id); + + // We currently do not do anything with this callback. + // QuicStream instances are created implicitly once the + // first chunk of stream data is received. + return 0; } @@ -3005,9 +3119,11 @@ int QuicSession::OnAckedCryptoOffset( uint64_t datalen, void* user_data) { QuicSession* session = static_cast(user_data); + if (UNLIKELY(session->is_destroyed())) return NGTCP2_ERR_CALLBACK_FAILURE; - QuicSession::Ngtcp2CallbackScope callback_scope(session); + + QuicSession::NgCallbackScope callback_scope(session); session->crypto_context()->AcknowledgeCryptoData(crypto_level, datalen); return 0; } @@ -3025,9 +3141,11 @@ int QuicSession::OnAckedStreamDataOffset( void* user_data, void* stream_user_data) { QuicSession* session = static_cast(user_data); + if (UNLIKELY(session->is_destroyed())) return NGTCP2_ERR_CALLBACK_FAILURE; - QuicSession::Ngtcp2CallbackScope callback_scope(session); + + QuicSession::NgCallbackScope callback_scope(session); session->AckedStreamDataOffset(stream_id, offset, datalen); return 0; } @@ -3045,9 +3163,11 @@ int QuicSession::OnSelectPreferredAddress( const ngtcp2_preferred_addr* paddr, void* user_data) { QuicSession* session = static_cast(user_data); + if (UNLIKELY(session->is_destroyed())) return NGTCP2_ERR_CALLBACK_FAILURE; - QuicSession::Ngtcp2CallbackScope callback_scope(session); + + QuicSession::NgCallbackScope callback_scope(session); // The paddr parameter contains the server advertised preferred // address. The dest parameter contains the address that is @@ -3073,9 +3193,11 @@ int QuicSession::OnStreamClose( void* user_data, void* stream_user_data) { QuicSession* session = static_cast(user_data); + if (UNLIKELY(session->is_destroyed())) return NGTCP2_ERR_CALLBACK_FAILURE; - QuicSession::Ngtcp2CallbackScope callback_scope(session); + + QuicSession::NgCallbackScope callback_scope(session); session->StreamClose(stream_id, app_error_code); return 0; } @@ -3094,9 +3216,11 @@ int QuicSession::OnStreamReset( void* user_data, void* stream_user_data) { QuicSession* session = static_cast(user_data); + if (UNLIKELY(session->is_destroyed())) return NGTCP2_ERR_CALLBACK_FAILURE; - QuicSession::Ngtcp2CallbackScope callback_scope(session); + + QuicSession::NgCallbackScope callback_scope(session); session->StreamReset(stream_id, final_size, app_error_code); return 0; } @@ -3133,11 +3257,13 @@ int QuicSession::OnGetNewConnectionID( size_t cidlen, void* user_data) { QuicSession* session = static_cast(user_data); + if (UNLIKELY(session->is_destroyed())) return NGTCP2_ERR_CALLBACK_FAILURE; - CHECK(!Ngtcp2CallbackScope::InNgtcp2CallbackScope(session)); - return session->GetNewConnectionID(cid, token, cidlen) ? - 0 : NGTCP2_ERR_CALLBACK_FAILURE; + + CHECK(!NgCallbackScope::InNgCallbackScope(session)); + session->GetNewConnectionID(cid, token, cidlen); + return 0; } // When a connection is closed, ngtcp2 will call this multiple @@ -3149,9 +3275,12 @@ int QuicSession::OnRemoveConnectionID( const ngtcp2_cid* cid, void* user_data) { QuicSession* session = static_cast(user_data); + if (UNLIKELY(session->is_destroyed())) return NGTCP2_ERR_CALLBACK_FAILURE; - session->RemoveConnectionID(QuicCID(cid)); + + if (session->is_server()) + session->socket()->DisassociateCID(QuicCID(cid)); return 0; } @@ -3172,9 +3301,11 @@ int QuicSession::OnPathValidation( ngtcp2_path_validation_result res, void* user_data) { QuicSession* session = static_cast(user_data); + if (UNLIKELY(session->is_destroyed())) return NGTCP2_ERR_CALLBACK_FAILURE; - QuicSession::Ngtcp2CallbackScope callback_scope(session); + + QuicSession::NgCallbackScope callback_scope(session); session->PathValidation(path, res); return 0; } @@ -3193,9 +3324,11 @@ int QuicSession::OnVersionNegotiation( size_t nsv, void* user_data) { QuicSession* session = static_cast(user_data); + if (UNLIKELY(session->is_destroyed())) return NGTCP2_ERR_CALLBACK_FAILURE; - QuicSession::Ngtcp2CallbackScope callback_scope(session); + + QuicSession::NgCallbackScope callback_scope(session); session->VersionNegotiation(sv, nsv); return 0; } @@ -3214,39 +3347,14 @@ int QuicSession::OnStatelessReset( const ngtcp2_pkt_stateless_reset* sr, void* user_data) { QuicSession* session = static_cast(user_data); + if (UNLIKELY(session->is_destroyed())) return NGTCP2_ERR_CALLBACK_FAILURE; + session->set_stateless_reset(); return 0; } -BaseObjectPtr QuicSession::qlog_stream() { - if (!qlog_stream_) { - CHECK(qlog_stream_ = QLogStream::Create(env())); - listener_->OnQLog(qlog_stream_.get()); - } - return qlog_stream_; -} - -void QuicSession::OnQlogWrite(void* user_data, const void* data, size_t len) { - QuicSession* session = static_cast(user_data); - Environment* env = session->env(); - - // Fun fact... ngtcp2 does not emit the final qlog statement until the - // ngtcp2_conn object is destroyed. Ideally, destroying is explicit, - // but sometimes the QuicSession object can be garbage collected without - // being explicitly destroyed. During those times, we cannot call out - // to JavaScript. Because we don't know for sure if we're in in a GC - // when this is called, it is safer to just defer writes to immediate. - BaseObjectPtr ptr = session->qlog_stream(); - std::vector buffer(len); - memcpy(buffer.data(), data, len); - env->SetImmediate([ptr = std::move(ptr), - buffer = std::move(buffer)](Environment*) { - ptr->Emit(buffer.data(), buffer.size()); - }); -} - const ngtcp2_conn_callbacks QuicSession::callbacks[2] = { // NGTCP2_CRYPTO_SIDE_CLIENT { @@ -3316,6 +3424,33 @@ const ngtcp2_conn_callbacks QuicSession::callbacks[2] = { } }; +BaseObjectPtr QuicSession::qlog_stream() { + if (!qlog_stream_) { + CHECK(qlog_stream_ = QLogStream::Create(env())); + listener_->OnQLog(qlog_stream_.get()); + } + return qlog_stream_; +} + +void QuicSession::OnQlogWrite(void* user_data, const void* data, size_t len) { + QuicSession* session = static_cast(user_data); + Environment* env = session->env(); + + // Fun fact... ngtcp2 does not emit the final qlog statement until the + // ngtcp2_conn object is destroyed. Ideally, destroying is explicit, + // but sometimes the QuicSession object can be garbage collected without + // being explicitly destroyed. During those times, we cannot call out + // to JavaScript. Because we don't know for sure if we're in in a GC + // when this is called, it is safer to just defer writes to immediate. + BaseObjectPtr ptr = session->qlog_stream(); + std::vector buffer(len); + memcpy(buffer.data(), data, len); + env->SetImmediate([ptr = std::move(ptr), + buffer = std::move(buffer)](Environment*) { + ptr->Emit(buffer.data(), buffer.size()); + }); +} + BaseObjectPtr QLogStream::Create(Environment* env) { HandleScope scope(env->isolate()); diff --git a/src/quic/node_quic_session.h b/src/quic/node_quic_session.h index 2dcb62b9aa9e22..ee268519a972e5 100644 --- a/src/quic/node_quic_session.h +++ b/src/quic/node_quic_session.h @@ -574,28 +574,57 @@ class QuicApplication : public MemoryRetainer, inline explicit QuicApplication(QuicSession* session); virtual ~QuicApplication() = default; + // The QuicSession will call Initialize as soon as the TLS + // secrets have been set. See QuicCryptoContext::OnSecrets virtual bool Initialize() = 0; + + // QuicSession will forward all received stream data immediately + // on to the QuicApplication. The only additional processing the + // QuicSession does is to automatically adjust the QuicSession-level + // flow control window. It is up to the QuicApplication to do + // the same for the QuicStream-level flow control. + // + // flags are passed on directly from ngtcp2. The most important + // of which here is NGTCP2_STREAM_DATA_FLAG_FIN, which indicates + // that this is the final chunk of data that the peer will send + // for this stream. + // + // It is also possible for the NGTCP2_STREAM_DATA_FLAG_0RTT flag + // to be set, indicating that this chunk of data was received in + // a 0RTT packet before the TLS handshake completed. This would + // indicate that it is not as secure and could be replayed by + // an attacker. We're not currently making use of that flag. virtual bool ReceiveStreamData( uint32_t flags, int64_t stream_id, const uint8_t* data, size_t datalen, uint64_t offset) = 0; + virtual void AcknowledgeStreamData( int64_t stream_id, uint64_t offset, - size_t datalen) { Acknowledge(stream_id, offset, datalen); } + size_t datalen) { + Acknowledge(stream_id, offset, datalen); + } + virtual bool BlockStream(int64_t id) { return true; } + virtual void ExtendMaxStreamsRemoteUni(uint64_t max_streams) {} + virtual void ExtendMaxStreamsRemoteBidi(uint64_t max_streams) {} + virtual void ExtendMaxStreamData(int64_t stream_id, uint64_t max_data) {} + virtual void ResumeStream(int64_t stream_id) {} + virtual void SetSessionTicketAppData(const SessionTicketAppData& app_data) { // TODO(@jasnell): Different QUIC applications may wish to set some // application data in the session ticket (e.g. http/3 would set // server settings in the application data). For now, doing nothing // as I'm just adding the basic mechanism. } + virtual SessionTicketAppData::Status GetSessionTicketAppData( const SessionTicketAppData& app_data, SessionTicketAppData::Flag flag) { @@ -607,28 +636,34 @@ class QuicApplication : public MemoryRetainer, SessionTicketAppData::Status::TICKET_USE_RENEW : SessionTicketAppData::Status::TICKET_USE; } + virtual void StreamHeaders( int64_t stream_id, int kind, const std::vector>& headers, int64_t push_id = 0); + virtual void StreamClose( int64_t stream_id, uint64_t app_error_code); - virtual void StreamOpen(int64_t stream_id); + virtual void StreamReset( int64_t stream_id, uint64_t app_error_code); + virtual bool SubmitInformation( int64_t stream_id, v8::Local headers) { return false; } + virtual bool SubmitHeaders( int64_t stream_id, v8::Local headers, uint32_t flags) { return false; } + virtual bool SubmitTrailers( int64_t stream_id, v8::Local headers) { return false; } + virtual BaseObjectPtr SubmitPush( int64_t stream_id, v8::Local headers) { @@ -671,7 +706,7 @@ class QuicApplication : public MemoryRetainer, virtual bool StreamCommit(StreamData* data, size_t datalen) = 0; virtual bool ShouldSetFin(const StreamData& data) = 0; - inline ssize_t WriteVStream( + ssize_t WriteVStream( QuicPathStorage* path, uint8_t* buf, ssize_t* ndatalen, @@ -926,7 +961,7 @@ class QuicSession final : public AsyncWrap, inline bool is_handshake_completed() const; // Checks to see if data needs to be retransmitted - void MaybeTimeout(); + void OnRetransmitTimeout(); // Called when the session has been determined to have been // idle for too long and needs to be torn down. @@ -967,6 +1002,8 @@ class QuicSession final : public AsyncWrap, // Causes pending ngtcp2 frames to be serialized and sent void SendPendingData(); + inline void ShutdownStream(int64_t stream_id, uint64_t code); + inline bool SendPacket( std::unique_ptr packet, const ngtcp2_path_storage& path); @@ -1056,6 +1093,8 @@ class QuicSession final : public AsyncWrap, void RemoveListener(QuicSessionListener* listener); + inline bool is_unable_to_send_packets(); + inline void set_connection_id_strategy( ConnectionIDStrategy strategy); @@ -1089,7 +1128,7 @@ class QuicSession final : public AsyncWrap, SendSessionScope(const SendSessionScope& other) = delete; ~SendSessionScope() { - if (Ngtcp2CallbackScope::InNgtcp2CallbackScope(session_.get()) || + if (NgCallbackScope::InNgCallbackScope(session_.get()) || session_->is_in_closing_period() || session_->is_in_draining_period()) { return; @@ -1121,7 +1160,7 @@ class QuicSession final : public AsyncWrap, ~ConnectionCloseScope() { if (silent_ || - Ngtcp2CallbackScope::InNgtcp2CallbackScope(session_.get()) || + NgCallbackScope::InNgCallbackScope(session_.get()) || session_->is_in_closing_period() || session_->is_in_draining_period()) { return; @@ -1138,21 +1177,21 @@ class QuicSession final : public AsyncWrap, // Tracks whether or not we are currently within an ngtcp2 callback // function. Certain ngtcp2 APIs are not supposed to be called when // within a callback. We use this as a gate to check. - class Ngtcp2CallbackScope final { + class NgCallbackScope final { public: - explicit Ngtcp2CallbackScope(QuicSession* session) : session_(session) { + explicit NgCallbackScope(QuicSession* session) : session_(session) { CHECK(session_); - CHECK(!InNgtcp2CallbackScope(session)); + CHECK(!InNgCallbackScope(session)); session_->set_in_ngtcp2_callback(); } - Ngtcp2CallbackScope(const Ngtcp2CallbackScope& other) = delete; + NgCallbackScope(const NgCallbackScope& other) = delete; - ~Ngtcp2CallbackScope() { + ~NgCallbackScope() { session_->set_in_ngtcp2_callback(false); } - static bool InNgtcp2CallbackScope(QuicSession* session) { + static bool InNgCallbackScope(QuicSession* session) { return session->is_in_ngtcp2_callback(); } @@ -1192,36 +1231,32 @@ class QuicSession final : public AsyncWrap, v8::Local dcid, QlogMode qlog); - inline void InitApplication(); + bool InitApplication(); void AckedStreamDataOffset( int64_t stream_id, uint64_t offset, uint64_t datalen); - inline void AssociateCID(const QuicCID& cid); - - inline void DisassociateCID(const QuicCID& cid); - - inline void ExtendMaxStreamData(int64_t stream_id, uint64_t max_data); + void ExtendMaxStreamData(int64_t stream_id, uint64_t max_data); void ExtendMaxStreams(bool bidi, uint64_t max_streams); - inline void ExtendMaxStreamsUni(uint64_t max_streams); + void ExtendMaxStreamsUni(uint64_t max_streams); - inline void ExtendMaxStreamsBidi(uint64_t max_streams); + void ExtendMaxStreamsBidi(uint64_t max_streams); - inline void ExtendMaxStreamsRemoteUni(uint64_t max_streams); + void ExtendMaxStreamsRemoteUni(uint64_t max_streams); - inline void ExtendMaxStreamsRemoteBidi(uint64_t max_streams); + void ExtendMaxStreamsRemoteBidi(uint64_t max_streams); - bool GetNewConnectionID(ngtcp2_cid* cid, uint8_t* token, size_t cidlen); + void GetNewConnectionID(ngtcp2_cid* cid, uint8_t* token, size_t cidlen); - inline void GetConnectionCloseInfo(); + void GetConnectionCloseInfo(); - inline void HandshakeCompleted(); + void HandshakeCompleted(); - inline void HandshakeConfirmed(); + void HandshakeConfirmed(); void PathValidation( const ngtcp2_path* path, @@ -1229,18 +1264,12 @@ class QuicSession final : public AsyncWrap, bool ReceivePacket(ngtcp2_path* path, const uint8_t* data, ssize_t nread); - inline void RemoveConnectionID(const QuicCID& cid); - void ScheduleRetransmit(); bool SendPacket(std::unique_ptr packet); - inline void set_local_address(const ngtcp2_addr* addr); - void StreamClose(int64_t stream_id, uint64_t app_error_code); - void StreamOpen(int64_t stream_id); - void StreamReset( int64_t stream_id, uint64_t final_size, @@ -1257,9 +1286,21 @@ class QuicSession final : public AsyncWrap, void UpdateDataStats(); - inline void UpdateEndpoint(const ngtcp2_path& path); + void UpdateEndpoint(const ngtcp2_path& path); + + void VersionNegotiation(const uint32_t* sv, size_t nsv); - inline void VersionNegotiation(const uint32_t* sv, size_t nsv); + void UpdateIdleTimer(); + + void UpdateClosingTimer(); + + void UpdateRetransmitTimer(uint64_t timeout); + + bool StartClosingPeriod(); + + void IncrementConnectionCloseAttempts(); + + bool ShouldAttemptConnectionClose(); static int OnReceiveCryptoData( ngtcp2_conn* conn, @@ -1402,18 +1443,6 @@ class QuicSession final : public AsyncWrap, static void OnQlogWrite(void* user_data, const void* data, size_t len); - void UpdateIdleTimer(); - - void UpdateClosingTimer(); - - inline void UpdateRetransmitTimer(uint64_t timeout); - - inline void StopRetransmitTimer(); - - inline void StopIdleTimer(); - - bool StartClosingPeriod(); - #define V(id, _) QUICSESSION_FLAG_##id, enum QuicSessionFlags : uint32_t { QUICSESSION_FLAGS(V) @@ -1421,36 +1450,6 @@ class QuicSession final : public AsyncWrap, }; #undef V - void IncrementConnectionCloseAttempts() { - if (connection_close_attempts_ < kMaxSizeT) - connection_close_attempts_++; - } - - bool ShouldAttemptConnectionClose() { - if (connection_close_attempts_ == connection_close_limit_) { - if (connection_close_limit_ * 2 <= kMaxSizeT) - connection_close_limit_ *= 2; - else - connection_close_limit_ = kMaxSizeT; - return true; - } - return false; - } - - typedef ssize_t(*ngtcp2_close_fn)( - ngtcp2_conn* conn, - ngtcp2_path* path, - uint8_t* dest, - size_t destlen, - uint64_t error_code, - ngtcp2_tstamp ts); - - static inline ngtcp2_close_fn SelectCloseFn(uint32_t family) { - return family == QUIC_ERROR_APPLICATION ? - ngtcp2_conn_write_application_close : - ngtcp2_conn_write_connection_close; - } - // Select the QUIC Application based on the configured ALPN identifier QuicApplication* SelectApplication(QuicSession* session); diff --git a/src/quic/node_quic_socket.cc b/src/quic/node_quic_socket.cc index 8c6fc253dad6dc..ae0847d8d27bd9 100644 --- a/src/quic/node_quic_socket.cc +++ b/src/quic/node_quic_socket.cc @@ -560,16 +560,12 @@ void QuicSocket::OnReceive( IncrementStat(&QuicSocketStats::packets_ignored); return; } - - // The QuicSession was destroyed while it was being set up. There's - // no further processing we can do here. - if (session->is_destroyed()) { - IncrementStat(&QuicSocketStats::packets_ignored); - return; - } } CHECK(session); + // If the QuicSession is already destroyed, there's nothing to do. + if (session->is_destroyed()) + return IncrementStat(&QuicSocketStats::packets_ignored); // If the packet could not successfully processed for any reason (possibly // due to being malformed or malicious in some way) we mark it ignored. diff --git a/src/quic/node_quic_stream-inl.h b/src/quic/node_quic_stream-inl.h index e253875c4d3389..e79a326359c6e9 100644 --- a/src/quic/node_quic_stream-inl.h +++ b/src/quic/node_quic_stream-inl.h @@ -25,11 +25,11 @@ QuicStreamOrigin QuicStream::origin() const { void QuicStream::set_final_size(uint64_t final_size) { // Only set the final size once. - if (is_fin()) { + if (state_->fin_received == 1) { CHECK_LE(final_size, GetStat(&QuicStreamStats::final_size)); return; } - set_fin(true); + state_->fin_received = 1; SetStat(&QuicStreamStats::final_size, final_size); } @@ -56,12 +56,24 @@ bool QuicStream::was_ever_readable() const { return true; } +void QuicStream::set_fin_sent() { + Debug(this, "final stream frame sent"); + state_->fin_sent = 1; + if (shutdown_done_ != nullptr) { + shutdown_done_(0); + } +} + +void QuicStream::set_destroyed() { + destroyed_ = true; +} + bool QuicStream::is_readable() const { - return was_ever_readable() && !is_read_closed(); + return was_ever_readable() && state_->read_ended == 0; } bool QuicStream::is_write_finished() const { - return is_fin_sent() && streambuf_.length() == 0; + return state_->fin_sent == 1 && streambuf_.length() == 0; } bool QuicStream::SubmitInformation(v8::Local headers) { @@ -118,11 +130,8 @@ void QuicStream::Commit(size_t amount) { // STOP_SENDING frame will be sent. void QuicStream::ResetStream(uint64_t app_error_code) { QuicSession::SendSessionScope send_scope(session()); - ngtcp2_conn_shutdown_stream( - session()->connection(), - stream_id_, - app_error_code); - set_read_closed(); + session()->ShutdownStream(id(), app_error_code); + state_->read_ended = 1; streambuf_.Cancel(); streambuf_.End(); } @@ -135,7 +144,22 @@ void QuicStream::StopSending(uint64_t app_error_code) { session()->connection(), stream_id_, app_error_code); - set_read_closed(); + state_->read_ended = 1; +} + +void QuicStream::CancelPendingWrites() { + // In case this stream is scheduled for sending data, remove it + // from the schedule queue + Unschedule(); + + // If there is data currently buffered in the streambuf_, + // then cancel will call out to invoke an arbitrary + // JavaScript callback (the on write callback). Within + // that callback, however, the QuicStream will no longer + // be usable to send or receive data. + streambuf_.End(); + streambuf_.Cancel(); + CHECK_EQ(streambuf_.length(), 0); } void QuicStream::Schedule(Queue* queue) { diff --git a/src/quic/node_quic_stream.cc b/src/quic/node_quic_stream.cc index 3b4e7286945957..57b797528a03cb 100644 --- a/src/quic/node_quic_stream.cc +++ b/src/quic/node_quic_stream.cc @@ -1,4 +1,5 @@ #include "node_quic_stream-inl.h" // NOLINT(build/include) +#include "aliased_struct-inl.h" #include "async_wrap-inl.h" #include "debug_utils-inl.h" #include "env-inl.h" @@ -29,6 +30,7 @@ using v8::Isolate; using v8::Local; using v8::Object; using v8::ObjectTemplate; +using v8::PropertyAttribute; using v8::String; using v8::Value; @@ -48,10 +50,18 @@ QuicStream::QuicStream( session_(sess), stream_id_(stream_id), push_id_(push_id), + state_(sess->env()->isolate()), quic_state_(sess->quic_state()) { CHECK_NOT_NULL(sess); Debug(this, "Created"); StreamBase::AttachToObject(GetObject()); + + wrap->DefineOwnProperty( + env()->context(), + env()->state_string(), + state_.GetArrayBuffer(), + PropertyAttribute::ReadOnly).Check(); + ngtcp2_transport_params params; ngtcp2_conn_get_local_transport_params(session()->connection(), ¶ms); IncrementStat(&QuicStreamStats::max_offset, params.initial_max_data); @@ -119,55 +129,52 @@ std::string QuicStream::diagnostic_name() const { } void QuicStream::Destroy(QuicError* error) { - if (is_destroyed()) + if (destroyed_) return; + destroyed_ = true; - QuicSession::SendSessionScope send_scope(session()); - - set_read_closed(); - set_destroyed(); + if (is_writable() || is_readable()) + session()->ShutdownStream(id(), 0); - // In case this stream is scheduled for sending, remove it - // from the schedule queue - Unschedule(); + CancelPendingWrites(); - // If there is data currently buffered in the streambuf_, - // then cancel will call out to invoke an arbitrary - // JavaScript callback (the on write callback). Within - // that callback, however, the QuicStream will no longer - // be usable to send or receive data. - streambuf_.End(); - streambuf_.Cancel(); - CHECK_EQ(streambuf_.length(), 0); - - // Attempt to send a shutdown signal to the remote peer - ResetStream(error != nullptr ? error->code : NGTCP2_NO_ERROR); - - // The QuicSession maintains a map of std::unique_ptrs to - // QuicStream instances. Removing this here will cause - // this QuicStream object to be deconstructed, so the - // QuicStream object will no longer exist after this point. session_->RemoveStream(stream_id_); } // Do shutdown is called when the JS stream writable side is closed. // If we're not within an ngtcp2 callback, this will trigger the -// QuicSession to send any pending data. Any time after this is -// called, a final stream frame will be sent for this QuicStream, -// but it may not be sent right away. +// QuicSession to send any pending data. If a final stream frame +// has not already been sent, it will be after this. int QuicStream::DoShutdown(ShutdownWrap* req_wrap) { if (is_destroyed()) return UV_EPIPE; + // If the fin bit has already been sent, we can return + // immediately because there's nothing else to do. The + // _final callback will be invoked immediately. + if (state_->fin_sent || !is_writable()) { + Debug(this, "Shutdown write immediately"); + return 1; + } + Debug(this, "Deferred shutdown. Waiting for fin sent"); + + CHECK_NULL(shutdown_done_); + CHECK_NOT_NULL(req_wrap); + shutdown_done_ = std::move([=](int status) { + CHECK_NOT_NULL(req_wrap); + shutdown_done_ = nullptr; + req_wrap->Done(status); + }); + QuicSession::SendSessionScope send_scope(session()); - if (is_writable()) { - Debug(this, "Shutdown writable side"); - RecordTimestamp(&QuicStreamStats::closing_at); - streambuf_.End(); - session()->ResumeStream(stream_id_); - } - return 1; + Debug(this, "Shutdown writable side"); + RecordTimestamp(&QuicStreamStats::closing_at); + state_->write_ended = 1; + streambuf_.End(); + session()->ResumeStream(stream_id_); + + return 0; } int QuicStream::DoWrite( @@ -176,6 +183,7 @@ int QuicStream::DoWrite( size_t nbufs, uv_stream_t* send_handle) { CHECK_NULL(send_handle); + CHECK(!streambuf_.is_ended()); // A write should not have happened if we've been destroyed or // the QuicStream is no longer (or was never) writable. @@ -218,6 +226,16 @@ int QuicStream::DoWrite( req_wrap->Done(status); }); + // If end() was called on the JS side, the write_ended flag + // will have been set. This allows us to know early if this + // is the final chunk. But this is only only to be triggered + // if end() was called with a final chunk of data to write. + // Otherwise, we have to wait for DoShutdown to be called. + if (state_->write_ended == 1) { + RecordTimestamp(&QuicStreamStats::closing_at); + streambuf_.End(); + } + session()->ResumeStream(stream_id_); return 0; @@ -234,8 +252,8 @@ bool QuicStream::IsClosing() { int QuicStream::ReadStart() { CHECK(!is_destroyed()); CHECK(is_readable()); - set_read_started(); - set_read_paused(false); + state_->read_started = 1; + state_->read_paused = 0; IncrementStat( &QuicStreamStats::max_offset, inbound_consumed_data_while_paused_); @@ -246,7 +264,7 @@ int QuicStream::ReadStart() { int QuicStream::ReadStop() { CHECK(!is_destroyed()); CHECK(is_readable()); - set_read_paused(); + state_->read_paused = 1; return 0; } @@ -348,7 +366,7 @@ void QuicStream::ReceiveData( datalen -= avail; // Capture read_paused before EmitRead in case user code callbacks // alter the state when EmitRead is called. - bool read_paused = is_read_paused(); + bool read_paused = state_->read_paused == 1; EmitRead(avail, buf); // Reading can be paused while we are processing. If that's // the case, we still want to acknowledge the current bytes diff --git a/src/quic/node_quic_stream.h b/src/quic/node_quic_stream.h index 96727d8661b1ba..6cbdef2416f02d 100644 --- a/src/quic/node_quic_stream.h +++ b/src/quic/node_quic_stream.h @@ -4,6 +4,7 @@ #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS #include "memory_tracker.h" +#include "aliased_struct.h" #include "async_wrap.h" #include "env.h" #include "node_http_common.h" @@ -79,13 +80,27 @@ struct QuicStreamStatsTraits { static void ToString(const Base& ptr, Fn&& add_field); }; -#define QUICSTREAM_FLAGS(V) \ - V(READ_CLOSED, read_closed) \ - V(READ_STARTED, read_started) \ - V(READ_PAUSED, read_paused) \ - V(FIN, fin) \ - V(FIN_SENT, fin_sent) \ - V(DESTROYED, destroyed) +#define QUICSTREAM_SHARED_STATE(V) \ + V(WRITE_ENDED, write_ended, uint8_t) \ + V(READ_STARTED, read_started, uint8_t) \ + V(READ_PAUSED, read_paused, uint8_t) \ + V(READ_ENDED, read_ended, uint8_t) \ + V(FIN_SENT, fin_sent, uint8_t) \ + V(FIN_RECEIVED, fin_received, uint8_t) + +#define V(_, name, type) type name; +struct QuicStreamState { + QUICSTREAM_SHARED_STATE(V); +}; +#undef V + +#define V(id, name, _) \ + IDX_QUICSTREAM_STATE_##id = offsetof(QuicStreamState, name), +enum QuicStreamStateFields { + QUICSTREAM_SHARED_STATE(V) + IDX_QUICSTREAM_STATE_END +}; +#undef V enum QuicStreamDirection { // The QuicStream is readable and writable in both directions @@ -212,13 +227,19 @@ class QuicStream : public AsyncWrap, // or the server. inline QuicStreamOrigin origin() const; + inline void set_fin_sent(); + + inline bool is_destroyed() const { return destroyed_; } + + inline void set_destroyed(); + // A QuicStream will not be writable if: // - The streambuf_ is ended // - It is a Unidirectional stream originating from the peer inline bool is_writable() const; // A QuicStream will not be readable if: - // - The QUICSTREAM_FLAG_READ_CLOSED flag is set or + // - The read ended flag is set or // - It is a Unidirectional stream originating from the local peer. inline bool is_readable() const; @@ -251,6 +272,8 @@ class QuicStream : public AsyncWrap, // Destroy the QuicStream and render it no longer usable. void Destroy(QuicError* error = nullptr); + inline void CancelPendingWrites(); + // Buffers chunks of data to be written to the QUIC connection. int DoWrite( WriteWrap* req_wrap, @@ -316,18 +339,6 @@ class QuicStream : public AsyncWrap, // Required for StreamBase int ReadStop() override; -#define V(id, name) \ - inline bool is_##name() const { \ - return flags_ & (1 << QUICSTREAM_FLAG_##id); } \ - inline void set_##name(bool on = true) { \ - if (on) \ - flags_ |= (1 << QUICSTREAM_FLAG_##id); \ - else \ - flags_ &= ~(1 << QUICSTREAM_FLAG_##id); \ - } - QUICSTREAM_FLAGS(V) -#undef V - // Required for StreamBase int DoShutdown(ShutdownWrap* req_wrap) override; @@ -362,19 +373,14 @@ class QuicStream : public AsyncWrap, void IncrementStats(size_t datalen); -#define V(id, _) QUICSTREAM_FLAG_##id, - enum QuicStreamStates : uint32_t { - QUICSTREAM_FLAGS(V) - QUICSTREAM_FLAG_COUNT - }; -#undef V - BaseObjectWeakPtr session_; QuicBuffer streambuf_; int64_t stream_id_ = 0; int64_t push_id_ = 0; - uint32_t flags_ = 0; + bool destroyed_ = false; + AliasedStruct state_; + DoneCB shutdown_done_ = nullptr; size_t inbound_consumed_data_while_paused_ = 0; diff --git a/src/stream_base.cc b/src/stream_base.cc index b35df39afe9d8c..b4008a304a1217 100644 --- a/src/stream_base.cc +++ b/src/stream_base.cc @@ -68,7 +68,6 @@ int StreamBase::UseUserBuffer(const FunctionCallbackInfo& args) { int StreamBase::Shutdown(const FunctionCallbackInfo& args) { CHECK(args[0]->IsObject()); Local req_wrap_obj = args[0].As(); - return Shutdown(req_wrap_obj); } diff --git a/test/parallel/test-quic-client-server.js b/test/parallel/test-quic-client-server.js index 23295ed2334bbb..6574d734947262 100644 --- a/test/parallel/test-quic-client-server.js +++ b/test/parallel/test-quic-client-server.js @@ -15,10 +15,17 @@ const { } } = internalBinding('quic'); +const qlog = process.env.NODE_QLOG === '1'; + const { Buffer } = require('buffer'); const Countdown = require('../common/countdown'); const assert = require('assert'); -const fs = require('fs'); +const { + createReadStream, + createWriteStream, + readFileSync +} = require('fs'); +const { pipeline } = require('stream'); const { key, cert, @@ -26,7 +33,7 @@ const { debug, } = require('../common/quic'); -const filedata = fs.readFileSync(__filename, { encoding: 'utf8' }); +const filedata = readFileSync(__filename, { encoding: 'utf8' }); const { createQuicSocket } = require('net'); @@ -37,10 +44,11 @@ const unidata = ['I wonder if it worked.', 'test']; const kServerName = 'agent2'; // Intentionally the wrong servername const kALPN = 'zzz'; // ALPN can be overriden to whatever we want -const options = { key, cert, ca, alpn: kALPN }; +const options = { key, cert, ca, alpn: kALPN, qlog }; -const client = createQuicSocket({ client: options }); +const client = createQuicSocket({ qlog, client: options }); const server = createQuicSocket({ + qlog, validateAddress: true, statelessResetSecret: kStatelessResetToken, server: options @@ -74,7 +82,8 @@ client.on('endpointClose', common.mustCall()); client.on('close', common.mustCall(onSocketClose.bind(client))); (async function() { - server.on('session', common.mustCall((session) => { + server.on('session', common.mustCall(async (session) => { + if (qlog) session.qlog.pipe(createWriteStream('server.qlog')); debug('QuicServerSession Created'); assert.strictEqual(session.maxStreams.bidi, 100); @@ -92,8 +101,6 @@ client.on('close', common.mustCall(onSocketClose.bind(client))); debug(`QuicServerSession Client ${family} address ${address}:${port}`); } - session.on('usePreferredAddress', common.mustNotCall()); - session.on('clientHello', common.mustCall( (alpn, servername, ciphers, cb) => { assert.strictEqual(alpn, kALPN); @@ -102,34 +109,33 @@ client.on('close', common.mustCall(onSocketClose.bind(client))); cb(); })); - session.on('OCSPRequest', common.mustCall( - (servername, context, cb) => { - debug('QuicServerSession received a OCSP request'); - assert.strictEqual(servername, kServerName); + session.on('OCSPRequest', common.mustCall((servername, context, cb) => { + debug('QuicServerSession received a OCSP request'); + assert.strictEqual(servername, kServerName); - // This will be a SecureContext. By default it will - // be the SecureContext used to create the QuicSession. - // If the user wishes to do something with it, it can, - // but if it wishes to pass in a new SecureContext, - // it can pass it in as the second argument to the - // callback below. - assert(context); - debug('QuicServerSession Certificate: ', context.getCertificate()); - debug('QuicServerSession Issuer: ', context.getIssuer()); - - // The callback can be invoked asynchronously - setImmediate(() => { - // The first argument is a potential error, - // in which case the session will be destroyed - // immediately. - // The second is an optional new SecureContext - // The third is the ocsp response. - // All arguments are optional - cb(null, null, Buffer.from('hello')); - }); - })); + // This will be a SecureContext. By default it will + // be the SecureContext used to create the QuicSession. + // If the user wishes to do something with it, it can, + // but if it wishes to pass in a new SecureContext, + // it can pass it in as the second argument to the + // callback below. + assert(context); + debug('QuicServerSession Certificate: ', context.getCertificate()); + debug('QuicServerSession Issuer: ', context.getIssuer()); + + // The callback can be invoked asynchronously + setImmediate(() => { + // The first argument is a potential error, + // in which case the session will be destroyed + // immediately. + // The second is an optional new SecureContext + // The third is the ocsp response. + // All arguments are optional + cb(null, null, Buffer.from('hello')); + }); + })); - session.on('secure', common.mustCall(async (servername, alpn, cipher) => { + session.on('secure', common.mustCall((servername, alpn, cipher) => { debug('QuicServerSession TLS Handshake Complete'); debug(' Server name: %s', servername); debug(' ALPN: %s', alpn); @@ -137,43 +143,49 @@ client.on('close', common.mustCall(onSocketClose.bind(client))); assert.strictEqual(session.servername, servername); assert.strictEqual(servername, kServerName); assert.strictEqual(session.alpnProtocol, alpn); - assert.strictEqual(session.getPeerCertificate().subject.CN, 'agent1'); - assert(session.authenticated); assert.strictEqual(session.authenticationError, undefined); + })); - const uni = await session.openStream({ halfOpen: true }); - assert(uni.unidirectional); - assert(!uni.bidirectional); - assert(uni.serverInitiated); - assert(!uni.clientInitiated); - assert(!uni.pending); - // The data and end events will never emit because - // the unidirectional stream is never readable. - uni.on('end', common.mustNotCall()); - uni.on('data', common.mustNotCall()); - uni.write(unidata[0], common.mustCall()); - uni.end(unidata[1], common.mustCall()); - uni.on('finish', common.mustCall()); - uni.on('close', common.mustCall(() => { - assert.strictEqual(uni.finalSize, 0); - })); - debug('Unidirectional, Server-initiated stream %d opened', uni.id); + const uni = await session.openStream({ halfOpen: true }); + debug('Unidirectional, Server-initiated stream %d opened', uni.id); + assert(uni.writable); + assert(!uni.readable); + assert(uni.unidirectional); + assert(!uni.bidirectional); + assert(uni.serverInitiated); + assert(!uni.clientInitiated); + uni.on('end', common.mustNotCall()); + uni.on('data', common.mustNotCall()); + uni.write(unidata[0], common.mustCall()); + uni.end(unidata[1]); + // TODO(@jasnell): There's currently a bug where the final + // write callback is not invoked if the stream/session is + // destroyed before we receive the acknowledgement for the + // write. + // uni.end(unidata[1], common.mustCall()); + // uni.on('finish', common.mustCall()); + uni.on('close', common.mustCall(() => { + assert.strictEqual(uni.finalSize, 0); })); session.on('stream', common.mustCall((stream) => { debug('Bidirectional, Client-initiated stream %d received', stream.id); assert.strictEqual(stream.id, 0); assert.strictEqual(stream.session, session); + assert(stream.writable); + assert(stream.readable); assert(stream.bidirectional); assert(!stream.unidirectional); assert(stream.clientInitiated); assert(!stream.serverInitiated); - const file = fs.createReadStream(__filename); let data = ''; - file.pipe(stream); + pipeline(createReadStream(__filename), stream, common.mustCall((err) => { + assert.ifError(err); + })); + stream.setEncoding('utf8'); stream.on('blocked', common.mustNotCall()); stream.on('data', (chunk) => { @@ -245,6 +257,7 @@ client.on('close', common.mustCall(onSocketClose.bind(client))); servername: kServerName, requestOCSP: true, }); + if (qlog) req.qlog.pipe(createWriteStream('client.qlog')); assert.strictEqual(req.servername, kServerName); @@ -293,43 +306,13 @@ client.on('close', common.mustCall(onSocketClose.bind(client))); code: 'ERR_QUIC_VERIFY_HOSTNAME_MISMATCH', message: 'Hostname mismatch' }); - - { - const { - address, - family, - port - } = req.remoteAddress; - const endpoint = server.endpoints[0].address; - assert.strictEqual(port, endpoint.port); - assert.strictEqual(family, endpoint.family); - debug(`QuicClientSession Server ${family} address ${address}:${port}`); - } - - const file = fs.createReadStream(__filename); - const stream = await req.openStream(); - file.pipe(stream); - let data = ''; - stream.resume(); - stream.setEncoding('utf8'); - stream.on('blocked', common.mustNotCall()); - stream.on('data', (chunk) => data += chunk); - stream.on('finish', common.mustCall()); - stream.on('end', common.mustCall(() => { - assert.strictEqual(data, filedata); - debug('Client received expected data for stream %d', stream.id); - })); - stream.on('close', common.mustCall(() => { - debug('Bidirectional, Client-initiated stream %d closed', stream.id); - assert.strictEqual(stream.finalSize, filedata.length); - countdown.dec(); - })); - debug('Bidirectional, Client-initiated stream %d opened', stream.id); })); req.on('stream', common.mustCall((stream) => { debug('Unidirectional, Server-initiated stream %d received', stream.id); let data = ''; + assert(stream.readable); + assert(!stream.writable); stream.setEncoding('utf8'); stream.on('data', (chunk) => data += chunk); stream.on('end', common.mustCall(() => { @@ -352,4 +335,37 @@ client.on('close', common.mustCall(onSocketClose.bind(client))); assert.strictEqual(code, NGTCP2_NO_ERROR); assert.strictEqual(family, QUIC_ERROR_APPLICATION); })); + + { + const { + address, + family, + port + } = req.remoteAddress; + const endpoint = server.endpoints[0].address; + assert.strictEqual(port, endpoint.port); + assert.strictEqual(family, endpoint.family); + debug(`QuicClientSession Server ${family} address ${address}:${port}`); + } + + const stream = await req.openStream(); + pipeline(createReadStream(__filename), stream, common.mustCall((err) => { + assert.ifError(err); + })); + let data = ''; + stream.resume(); + stream.setEncoding('utf8'); + stream.on('finish', common.mustCall()); + stream.on('blocked', common.mustNotCall()); + stream.on('data', (chunk) => data += chunk); + stream.on('end', common.mustCall(() => { + assert.strictEqual(data, filedata); + debug('Client received expected data for stream %d', stream.id); + })); + stream.on('close', common.mustCall(() => { + debug('Bidirectional, Client-initiated stream %d closed', stream.id); + assert.strictEqual(stream.finalSize, filedata.length); + countdown.dec(); + })); + debug('Bidirectional, Client-initiated stream %d opened', stream.id); })().then(common.mustCall()); diff --git a/test/parallel/test-quic-http3-push.js b/test/parallel/test-quic-http3-push.js index 77c4b164b0e57f..9b5d45f8230e4a 100644 --- a/test/parallel/test-quic-http3-push.js +++ b/test/parallel/test-quic-http3-push.js @@ -12,11 +12,14 @@ const assert = require('assert'); const { key, cert, ca, kHttp3Alpn } = require('../common/quic'); const { createQuicSocket } = require('net'); +const { createWriteStream } = require('fs'); -const options = { key, cert, ca, alpn: kHttp3Alpn }; +const qlog = process.env.NODE_QLOG === '1'; -const client = createQuicSocket({ client: options }); -const server = createQuicSocket({ server: options }); +const options = { key, cert, ca, alpn: kHttp3Alpn, qlog }; + +const client = createQuicSocket({ qlog, client: options }); +const server = createQuicSocket({ qlog, server: options }); client.on('close', common.mustCall()); server.on('close', common.mustCall()); @@ -28,7 +31,7 @@ const countdown = new Countdown(2, () => { (async function() { server.on('session', common.mustCall((session) => { - + if (qlog) session.qlog.pipe(createWriteStream('server.qlog')); session.on('stream', common.mustCall((stream) => { assert(stream.submitInitialHeaders({ ':status': '200' })); @@ -58,7 +61,9 @@ const countdown = new Countdown(2, () => { push.submitInitialHeaders({ ':status': '200' }); push.end('testing'); push.on('close', common.mustCall()); - push.on('finish', common.mustCall()); + // TODO(@jasnell): There's current a bug where the last + // _final callback is not being invoked in this case + // push.on('finish', common.mustCall()); stream.end('hello world'); stream.resume(); @@ -78,7 +83,6 @@ const countdown = new Countdown(2, () => { stream.on('informationalHeaders', common.mustNotCall()); stream.on('trailingHeaders', common.mustNotCall()); })); - session.on('close', common.mustCall()); })); @@ -90,6 +94,7 @@ const countdown = new Countdown(2, () => { maxStreamsUni: 10, h3: { maxPushes: 10 } }); + if (qlog) req.qlog.pipe(createWriteStream('client.qlog')); req.on('stream', common.mustCall((stream) => { let data = ''; diff --git a/test/parallel/test-quic-quicsession-resume.js b/test/parallel/test-quic-quicsession-resume.js index 85e48f48b10b2c..f8f8ad6d2746b4 100644 --- a/test/parallel/test-quic-quicsession-resume.js +++ b/test/parallel/test-quic-quicsession-resume.js @@ -17,12 +17,14 @@ const { debug, } = require('../common/quic'); +const { createWriteStream } = require('fs'); const { createQuicSocket } = require('net'); -const options = { key, cert, ca, alpn: 'zzz' }; +const qlog = process.env.NODE_QLOG === '1'; +const options = { key, cert, ca, alpn: 'zzz', qlog }; -const server = createQuicSocket({ server: options }); -const client = createQuicSocket({ client: options }); +const server = createQuicSocket({ qlog, server: options }); +const client = createQuicSocket({ qlog, client: options }); const countdown = new Countdown(2, () => { server.close(); @@ -30,12 +32,13 @@ const countdown = new Countdown(2, () => { }); (async function() { + let counter = 0; server.on('session', common.mustCall((session) => { - session.on('secure', common.mustCall(() => { - assert(session.usingEarlyData); - })); - + if (qlog) session.qlog.pipe(createWriteStream(`server-${counter++}.qlog`)); session.on('stream', common.mustCall((stream) => { + stream.on('close', common.mustCall()); + assert(stream.unidirectional); + assert(!stream.writable); stream.resume(); })); }, 2)); @@ -49,6 +52,7 @@ const countdown = new Countdown(2, () => { address: common.localhostIPv4, port: server.endpoints[0].address.port, }); + if (qlog) req.qlog.pipe(createWriteStream(`client-${counter}.qlog`)); req.on('sessionTicket', common.mustCall((ticket, params) => { assert(ticket instanceof Buffer); @@ -61,9 +65,7 @@ const countdown = new Countdown(2, () => { const stream = await req.openStream({ halfOpen: true }); stream.end('hello'); - stream.resume(); stream.on('close', () => { - req.close(); countdown.dec(); // Wait a turn then start a new session using the stored // ticket and transportParameters @@ -77,6 +79,7 @@ const countdown = new Countdown(2, () => { sessionTicket, remoteTransportParams }); + if (qlog) req.qlog.pipe(createWriteStream('client2.qlog')); assert(req.allowEarlyData); @@ -91,5 +94,4 @@ const countdown = new Countdown(2, () => { // true when the early data was accepted. assert(!req.usingEarlyData); } - })().then(common.mustCall()); diff --git a/test/parallel/test-quic-quicsession-send-fd.js b/test/parallel/test-quic-quicsession-send-fd.js index c99d3580bd606d..6694052d864128 100644 --- a/test/parallel/test-quic-quicsession-send-fd.js +++ b/test/parallel/test-quic-quicsession-send-fd.js @@ -8,9 +8,10 @@ const assert = require('assert'); const { createQuicSocket } = require('net'); const { once } = require('events'); const fs = require('fs'); +const qlog = process.env.NODE_QLOG === '1'; const { key, cert, ca } = require('../common/quic'); -const options = { key, cert, ca, alpn: 'meow' }; +const options = { key, cert, ca, alpn: 'meow', qlog }; const variants = []; for (const variant of ['sendFD', 'sendFile', 'sendFD+fileHandle']) { @@ -26,18 +27,22 @@ for (const variant of ['sendFD', 'sendFile', 'sendFD+fileHandle']) { })().then(common.mustCall()); async function test({ variant, offset, length }) { - const server = createQuicSocket({ server: options }); - const client = createQuicSocket({ client: options }); + const server = createQuicSocket({ qlog, server: options }); + const client = createQuicSocket({ qlog, client: options }); let fd; server.on('session', common.mustCall(async (session) => { + if (qlog) { + session.qlog.pipe( + fs.createWriteStream(`server-${variant}-${offset}-${length}.qlog`)); + } + const stream = await session.openStream({ halfOpen: true }); // The data and end events won't emit because // the stream is never readable. stream.on('data', common.mustNotCall()); stream.on('end', common.mustNotCall()); - stream.on('finish', common.mustCall()); stream.on('close', common.mustCall()); @@ -63,6 +68,10 @@ async function test({ variant, offset, length }) { address: 'localhost', port: server.endpoints[0].address.port }); + if (qlog) { + req.qlog.pipe( + fs.createWriteStream(`client-${variant}-${offset}-${length}.qlog`)); + } req.on('stream', common.mustCall((stream) => { const data = []; @@ -72,14 +81,15 @@ async function test({ variant, offset, length }) { if (offset !== -1) expectedContent = expectedContent.slice(offset); if (length !== -1) expectedContent = expectedContent.slice(0, length); assert.deepStrictEqual(Buffer.concat(data), expectedContent); - - client.close(); - server.close(); if (fd !== undefined) { if (fd.close) fd.close().then(common.mustCall()); else fs.closeSync(fd); } })); + stream.on('close', common.mustCall(() => { + client.close(); + server.close(); + })); })); await Promise.all([ diff --git a/test/parallel/test-quic-quicstream-close-early.js b/test/parallel/test-quic-quicstream-close-early.js index 3c486db7c9ceeb..6e7b4f4379523e 100644 --- a/test/parallel/test-quic-quicstream-close-early.js +++ b/test/parallel/test-quic-quicstream-close-early.js @@ -11,10 +11,13 @@ const { key, cert, ca } = require('../common/quic'); const { once } = require('events'); const { createQuicSocket } = require('net'); -const options = { key, cert, ca, alpn: 'zzz' }; +const qlog = process.env.NODE_QLOG === '1'; +const { createWriteStream } = require('fs'); -const client = createQuicSocket({ client: options }); -const server = createQuicSocket({ server: options }); +const options = { key, cert, ca, alpn: 'zzz', qlog }; + +const client = createQuicSocket({ qlog, client: options }); +const server = createQuicSocket({ qlog, server: options }); const countdown = new Countdown(2, () => { server.close(); @@ -23,13 +26,22 @@ const countdown = new Countdown(2, () => { (async function() { server.on('session', common.mustCall(async (session) => { + if (qlog) session.qlog.pipe(createWriteStream('server.qlog')); const uni = await session.openStream({ halfOpen: true }); - uni.write('hi', common.expectsError()); - uni.on('error', common.mustCall()); + uni.write('hi', common.mustCall((err) => assert(!err))); + uni.on('error', common.mustNotCall()); uni.on('data', common.mustNotCall()); uni.on('close', common.mustCall()); - uni.close(3); - session.on('stream', common.mustNotCall()); + uni.close(); + + session.on('stream', common.mustCall((stream) => { + assert(stream.bidirectional); + assert(stream.readable); + assert(stream.writable); + stream.on('close', common.mustCall()); + stream.end(); + stream.resume(); + })); session.on('close', common.mustCall()); })); @@ -39,9 +51,12 @@ const countdown = new Countdown(2, () => { address: 'localhost', port: server.endpoints[0].address.port, }); + if (qlog) req.qlog.pipe(createWriteStream('client.qlog')); req.on('stream', common.mustCall((stream) => { - stream.on('abort', common.mustNotCall()); + assert(stream.unidirectional); + assert(stream.readable); + assert(!stream.writable); stream.on('data', common.mustCall((chunk) => { assert.strictEqual(chunk.toString(), 'hi'); })); @@ -52,14 +67,14 @@ const countdown = new Countdown(2, () => { })); const stream = await req.openStream(); - stream.write('hello', common.expectsError()); - stream.write('there', common.expectsError()); - stream.on('error', common.mustCall()); - stream.on('end', common.mustNotCall()); - stream.on('close', common.mustCall(() => { - countdown.dec(); - })); - stream.close(1); + stream.write('hello', common.mustCall((err) => assert(!err))); + stream.write('there', common.mustCall((err) => assert(!err))); + stream.resume(); + stream.on('error', common.mustNotCall()); + stream.on('end', common.mustCall()); + stream.on('close', common.mustCall()); + await stream.close(); + countdown.dec(); await Promise.all([ once(server, 'close'), diff --git a/test/parallel/test-quic-quicstream-destroy.js b/test/parallel/test-quic-quicstream-destroy.js index 8fb2f3fb1ec36a..3f0b0bf49e7237 100644 --- a/test/parallel/test-quic-quicstream-destroy.js +++ b/test/parallel/test-quic-quicstream-destroy.js @@ -38,11 +38,10 @@ const server = createQuicSocket({ server: options }); }); const stream = await req.openStream(); - stream.write('foo'); + stream.end('foo'); // Do not explicitly end the stream here. - stream.on('finish', common.mustNotCall()); - stream.on('data', common.mustNotCall()); + stream.resume(); stream.on('end', common.mustCall()); stream.on('close', common.mustCall(() => { diff --git a/test/parallel/test-quic-simple-server-bidi.js b/test/parallel/test-quic-simple-server-bidi.js new file mode 100644 index 00000000000000..0b12a9d5452d03 --- /dev/null +++ b/test/parallel/test-quic-simple-server-bidi.js @@ -0,0 +1,55 @@ +// Flags: --no-warnings +'use strict'; + +const common = require('../common'); +if (!common.hasQuic) + common.skip('missing quic'); + +const { key, cert, ca } = require('../common/quic'); + +const { createWriteStream } = require('fs'); +const { createQuicSocket } = require('net'); +const { strictEqual } = require('assert'); + +const qlog = process.env.NODE_QLOG === '1'; + +const options = { key, cert, ca, alpn: 'zzz', qlog }; + +const client = createQuicSocket({ qlog, client: options }); +const server = createQuicSocket({ qlog, server: options }); + +(async function() { + server.on('session', common.mustCall(async (session) => { + if (qlog) session.qlog.pipe(createWriteStream('server.qlog')); + const stream = await session.openStream(); + stream.resume(); + stream.write('from the '); + setTimeout(() => stream.end('server'), common.platformTimeout(10)); + + session.on('close', common.mustCall(() => { + server.close(); + })); + })); + + await server.listen(); + + const req = await client.connect({ + address: 'localhost', + port: server.endpoints[0].address.port + }); + if (qlog) req.qlog.pipe(createWriteStream('client.qlog')); + + req.on('stream', common.mustCall(async (stream) => { + let data = ''; + stream.setEncoding('utf8'); + + stream.end('foo'); + + for await (const chunk of stream) + data += chunk; + strictEqual(data, 'from the server'); + + await req.close(); + client.close(); + })); +})().then(common.mustCall()); diff --git a/test/parallel/test-quic-simple-server-uni.js b/test/parallel/test-quic-simple-server-uni.js new file mode 100644 index 00000000000000..19498f0f34cec7 --- /dev/null +++ b/test/parallel/test-quic-simple-server-uni.js @@ -0,0 +1,60 @@ +// Flags: --no-warnings +'use strict'; + +const common = require('../common'); +if (!common.hasQuic) + common.skip('missing quic'); + +const { key, cert, ca } = require('../common/quic'); + +const { createWriteStream } = require('fs'); +const { createQuicSocket } = require('net'); +const { strictEqual } = require('assert'); + +const qlog = process.env.NODE_QLOG === '1'; + +const options = { key, cert, ca, alpn: 'zzz', qlog }; + +const client = createQuicSocket({ qlog, client: options }); +const server = createQuicSocket({ qlog, server: options }); + +server.on('close', common.mustCall()); +client.on('close', common.mustCall()); + +(async function() { + server.on('session', common.mustCall(async (session) => { + if (qlog) session.qlog.pipe(createWriteStream('server.qlog')); + const stream = await session.openStream({ halfOpen: true }); + stream.write('from the '); + setTimeout(() => stream.end('server'), common.platformTimeout(10)); + stream.on('close', common.mustCall()); + session.on('close', common.mustCall(() => { + server.close(); + })); + })); + + await server.listen(); + + const req = await client.connect({ + address: 'localhost', + port: server.endpoints[0].address.port + }); + if (qlog) req.qlog.pipe(createWriteStream('client.qlog')); + + req.on('close', common.mustCall()); + + req.on('stream', common.mustCall(async (stream) => { + let data = ''; + stream.setEncoding('utf8'); + stream.on('close', common.mustCall()); + + for await (const chunk of stream) + data += chunk; + + strictEqual(data, 'from the server'); + + await req.close(); + + client.close(); + })); +})().then(common.mustCall());