/
node_quic_stream.h
403 lines (328 loc) Β· 14 KB
/
node_quic_stream.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
#ifndef SRC_QUIC_NODE_QUIC_STREAM_H_
#define SRC_QUIC_NODE_QUIC_STREAM_H_
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
#include "memory_tracker.h"
#include "async_wrap.h"
#include "env.h"
#include "node_http_common.h"
#include "node_quic_state.h"
#include "node_quic_util.h"
#include "stream_base-inl.h"
#include "util-inl.h"
#include "v8.h"
#include <string>
#include <vector>
namespace node {
namespace quic {
class QuicSession;
class QuicStream;
class QuicApplication;
using QuicHeader = NgHeaderBase<QuicApplication>;
enum QuicStreamHeaderFlags : uint32_t {
// No flags
QUICSTREAM_HEADER_FLAGS_NONE = 0,
// Set if the initial headers are considered
// terminal (that is, the stream should be closed
// after transmitting the headers). If headers are
// not supported by the QUIC Application, flag is
// ignored.
QUICSTREAM_HEADER_FLAGS_TERMINAL = 1
};
enum QuicStreamHeadersKind : int {
QUICSTREAM_HEADERS_KIND_NONE = 0,
QUICSTREAM_HEADERS_KIND_INFORMATIONAL,
QUICSTREAM_HEADERS_KIND_INITIAL,
QUICSTREAM_HEADERS_KIND_TRAILING,
QUICSTREAM_HEADERS_KIND_PUSH
};
#define STREAM_STATS(V) \
V(CREATED_AT, created_at, "Created At") \
V(RECEIVED_AT, received_at, "Last Received At") \
V(ACKED_AT, acked_at, "Last Acknowledged At") \
V(CLOSING_AT, closing_at, "Closing At") \
V(DESTROYED_AT, destroyed_at, "Destroyed At") \
V(BYTES_RECEIVED, bytes_received, "Bytes Received") \
V(BYTES_SENT, bytes_sent, "Bytes Sent") \
V(MAX_OFFSET, max_offset, "Max Offset") \
V(MAX_OFFSET_ACK, max_offset_ack, "Max Acknowledged Offset") \
V(MAX_OFFSET_RECV, max_offset_received, "Max Received Offset") \
V(FINAL_SIZE, final_size, "Final Size")
#define V(name, _, __) IDX_QUIC_STREAM_STATS_##name,
enum QuicStreamStatsIdx : int {
STREAM_STATS(V)
IDX_QUIC_STREAM_STATS_COUNT
};
#undef V
#define V(_, name, __) uint64_t name;
struct QuicStreamStats {
STREAM_STATS(V)
};
#undef V
struct QuicStreamStatsTraits {
using Stats = QuicStreamStats;
using Base = QuicStream;
template <typename Fn>
static void ToString(const Base& ptr, Fn&& add_field);
};
#define QUICSTREAM_FLAGS(V) \
V(READ_CLOSED, read_closed) \
V(READ_STARTED, read_started) \
V(READ_PAUSED, read_paused) \
V(FIN, fin) \
V(FIN_SENT, fin_sent) \
V(DESTROYED, destroyed)
enum QuicStreamDirection {
// The QuicStream is readable and writable in both directions
QUIC_STREAM_BIRECTIONAL,
// The QuicStream is writable and readable in only one direction.
// The direction depends on the QuicStreamOrigin.
QUIC_STREAM_UNIDIRECTIONAL
};
enum QuicStreamOrigin {
// The QuicStream was created by the server.
QUIC_STREAM_SERVER,
// The QuicStream was created by the client.
QUIC_STREAM_CLIENT
};
// QuicStream's are simple data flows that, fortunately, do not
// require much. They may be:
//
// * Bidirectional or Unidirectional
// * Server or Client Initiated
//
// The flow direction and origin of the stream are important in
// determining the write and read state (Open or Closed). Specifically:
//
// A Unidirectional stream originating with the Server is:
//
// * Server Writable (Open) but not Client Writable (Closed)
// * Client Readable (Open) but not Server Readable (Closed)
//
// Likewise, a Unidirectional stream originating with the
// Client is:
//
// * Client Writable (Open) but not Server Writable (Closed)
// * Server Readable (Open) but not Client Readable (Closed)
//
// Bidirectional Stream States
// +------------+--------------+--------------------+---------------------+
// | | Initiated By | Initial Read State | Initial Write State |
// +------------+--------------+--------------------+---------------------+
// | On Server | Server | Open | Open |
// +------------+--------------+--------------------+---------------------+
// | On Server | Client | Open | Open |
// +------------+--------------+--------------------+---------------------+
// | On Client | Server | Open | Open |
// +------------+--------------+--------------------+---------------------+
// | On Client | Client | Open | Open |
// +------------+--------------+--------------------+---------------------+
//
// Unidirectional Stream States
// +------------+--------------+--------------------+---------------------+
// | | Initiated By | Initial Read State | Initial Write State |
// +------------+--------------+--------------------+---------------------+
// | On Server | Server | Closed | Open |
// +------------+--------------+--------------------+---------------------+
// | On Server | Client | Open | Closed |
// +------------+--------------+--------------------+---------------------+
// | On Client | Server | Open | Closed |
// +------------+--------------+--------------------+---------------------+
// | On Client | Client | Closed | Open |
// +------------+--------------+--------------------+---------------------+
//
// All data sent via the QuicStream is buffered internally until either
// receipt is acknowledged from the peer or attempts to send are abandoned.
//
// A QuicStream may be in a fully Closed (Read and Write) state but still
// have unacknowledged data in it's outbound queue.
//
// A QuicStream is gracefully closed when (a) both Read and Write states
// are Closed, (b) all queued data has been acknowledged.
//
// The JavaScript Writable side of the QuicStream may be shutdown before
// all pending queued data has been serialized to frames. During this state,
// no additional data may be queued to send.
//
// The Write state of a QuicStream will not be closed while there is still
// pending writes on the JavaScript side.
//
// The QuicStream may be forcefully closed immediately using destroy(err).
// This causes all queued data and pending JavaScript writes to be
// abandoned, and causes the QuicStream to be immediately closed at the
// ngtcp2 level.
class QuicStream : public AsyncWrap,
public bob::SourceImpl<ngtcp2_vec>,
public StreamBase,
public StatsBase<QuicStreamStatsTraits> {
public:
static void Initialize(
Environment* env,
v8::Local<v8::Object> target,
v8::Local<v8::Context> context);
static BaseObjectPtr<QuicStream> New(
QuicSession* session,
int64_t stream_id,
int64_t push_id = 0);
QuicStream(
QuicSession* session,
v8::Local<v8::Object> target,
int64_t stream_id,
int64_t push_id = 0);
~QuicStream() override;
std::string diagnostic_name() const override;
// The numeric identifier of the QuicStream.
int64_t id() const { return stream_id_; }
// If the QuicStream is associated with a push promise,
// the numeric identifier of the promise. Currently only
// used by HTTP/3.
int64_t push_id() const { return push_id_; }
QuicSession* session() const { return session_.get(); }
// A QuicStream can be either uni- or bi-directional.
inline QuicStreamDirection direction() const;
// A QuicStream can be initiated by either the client
// or the server.
inline QuicStreamOrigin origin() const;
// A QuicStream will not be writable if:
// - The streambuf_ is ended
// - It is a Unidirectional stream originating from the peer
inline bool is_writable() const;
// A QuicStream will not be readable if:
// - The QUICSTREAM_FLAG_READ_CLOSED flag is set or
// - It is a Unidirectional stream originating from the local peer.
inline bool is_readable() const;
// IsWriteFinished will return true if a final stream frame
// has been sent and all data has been acknowledged (the
// send buffer is empty).
inline bool is_write_finished() const;
// Specifies the kind of headers currently being processed.
inline void set_headers_kind(QuicStreamHeadersKind kind);
// Set the final size for the QuicStream. This only works
// the first time it is called. Subsequent calls will be
// ignored unless the subsequent size is greater than the
// prior set size, in which case we have a bug and we'll
// assert.
inline void set_final_size(uint64_t final_size);
// The final size is the maximum amount of data that has been
// acknowleged to have been received for a QuicStream.
uint64_t final_size() const {
return GetStat(&QuicStreamStats::final_size);
}
// Marks the given data range as having been acknowledged.
// This means that the data range may be released from
// memory.
void Acknowledge(uint64_t offset, size_t datalen);
// Destroy the QuicStream and render it no longer usable.
void Destroy(QuicError* error = nullptr);
// Buffers chunks of data to be written to the QUIC connection.
int DoWrite(
WriteWrap* req_wrap,
uv_buf_t* bufs,
size_t nbufs,
uv_stream_t* send_handle) override;
// Returns false if the header cannot be added. This will
// typically only happen if a maximimum number of headers
// has been reached.
bool AddHeader(std::unique_ptr<QuicHeader> header);
// Some QUIC applications support headers, others do not.
// The following methods allow consistent handling of
// headers at the QuicStream level regardless of the
// protocol. For applications that do not support headers,
// these are simply not used.
inline void BeginHeaders(
QuicStreamHeadersKind kind = QUICSTREAM_HEADERS_KIND_NONE);
// Indicates an amount of unacknowledged data that has been
// submitted to the QUIC connection.
inline void Commit(size_t amount);
inline void EndHeaders(int64_t push_id = 0);
// Passes a chunk of data on to the QuicStream listener.
void ReceiveData(
uint32_t flags,
const uint8_t* data,
size_t datalen,
uint64_t offset);
// Resets the QUIC stream, sending a signal to the peer that
// no additional data will be transmitted for this stream.
inline void ResetStream(uint64_t app_error_code = NGTCP2_NO_ERROR);
inline void StopSending(uint64_t app_error_code = NGTCP2_NO_ERROR);
// Submits informational headers. Returns false if headers are not
// supported on the underlying QuicApplication.
inline bool SubmitInformation(v8::Local<v8::Array> headers);
// Submits initial headers. Returns false if headers are not
// supported on the underlying QuicApplication.
inline bool SubmitHeaders(v8::Local<v8::Array> headers, uint32_t flags);
// Submits trailing headers. Returns false if headers are not
// supported on the underlying QuicApplication.
inline bool SubmitTrailers(v8::Local<v8::Array> headers);
inline BaseObjectPtr<QuicStream> SubmitPush(v8::Local<v8::Array> headers);
// Required for StreamBase
bool IsAlive() override;
// Required for StreamBase
bool IsClosing() override;
// Required for StreamBase
int ReadStart() override;
// Required for StreamBase
int ReadStop() override;
#define V(id, name) \
inline bool is_##name() const { \
return flags_ & (1 << QUICSTREAM_FLAG_##id); } \
inline void set_##name(bool on = true) { \
if (on) \
flags_ |= (1 << QUICSTREAM_FLAG_##id); \
else \
flags_ &= ~(1 << QUICSTREAM_FLAG_##id); \
}
QUICSTREAM_FLAGS(V)
#undef V
// Required for StreamBase
int DoShutdown(ShutdownWrap* req_wrap) override;
AsyncWrap* GetAsyncWrap() override { return this; }
QuicState* quic_state() { return quic_state_.get(); }
// Required for MemoryRetainer
void MemoryInfo(MemoryTracker* tracker) const override;
SET_MEMORY_INFO_NAME(QuicStream)
SET_SELF_SIZE(QuicStream)
protected:
int DoPull(
bob::Next<ngtcp2_vec> next,
int options,
ngtcp2_vec* data,
size_t count,
size_t max_count_hint) override;
private:
// WasEverWritable returns true if it is a bidirectional stream,
// or a Unidirectional stream originating from the local peer.
// If was_ever_writable() is false, then no stream frames should
// ever be sent from the local peer, including final stream frames.
inline bool was_ever_writable() const;
// WasEverReadable returns true if it is a bidirectional stream,
// or a Unidirectional stream originating from the remote
// peer.
inline bool was_ever_readable() const;
void IncrementStats(size_t datalen);
#define V(id, _) QUICSTREAM_FLAG_##id,
enum QuicStreamStates : uint32_t {
QUICSTREAM_FLAGS(V)
QUICSTREAM_FLAG_COUNT
};
#undef V
BaseObjectWeakPtr<QuicSession> session_;
QuicBuffer streambuf_;
int64_t stream_id_ = 0;
int64_t push_id_ = 0;
uint32_t flags_ = 0;
size_t inbound_consumed_data_while_paused_ = 0;
std::vector<std::unique_ptr<QuicHeader>> headers_;
QuicStreamHeadersKind headers_kind_;
size_t current_headers_length_ = 0;
ListNode<QuicStream> stream_queue_;
BaseObjectPtr<QuicState> quic_state_;
public:
// Linked List of QuicStream objects
using Queue = ListHead<QuicStream, &QuicStream::stream_queue_>;
inline void Schedule(Queue* queue);
inline void Unschedule();
};
} // namespace quic
} // namespace node
#endif // NODE_WANT_INTERNALS
#endif // SRC_QUIC_NODE_QUIC_STREAM_H_