/
node_quic_default_application.cc
181 lines (156 loc) Β· 4.75 KB
/
node_quic_default_application.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
#include "debug_utils-inl.h"
#include "node_quic_buffer-inl.h"
#include "node_quic_default_application.h"
#include "node_quic_session-inl.h"
#include "node_quic_socket-inl.h"
#include "node_quic_stream-inl.h"
#include "node_quic_util-inl.h"
#include "node_sockaddr-inl.h"
#include <ngtcp2/ngtcp2.h>
#include <vector>
namespace node {
namespace quic {
namespace {
void Consume(ngtcp2_vec** pvec, size_t* pcnt, size_t len) {
ngtcp2_vec* v = *pvec;
size_t cnt = *pcnt;
for (; cnt > 0; --cnt, ++v) {
if (v->len > len) {
v->len -= len;
v->base += len;
break;
}
len -= v->len;
}
*pvec = v;
*pcnt = cnt;
}
int IsEmpty(const ngtcp2_vec* vec, size_t cnt) {
size_t i;
for (i = 0; i < cnt && vec[i].len == 0; ++i) {}
return i == cnt;
}
} // anonymous namespace
DefaultApplication::DefaultApplication(
QuicSession* session) :
QuicApplication(session) {}
bool DefaultApplication::Initialize() {
if (needs_init()) {
Debug(session(), "Default QUIC Application Initialized");
set_init_done();
}
return needs_init();
}
void DefaultApplication::ScheduleStream(int64_t stream_id) {
BaseObjectPtr<QuicStream> stream = session()->FindStream(stream_id);
Debug(session(), "Scheduling stream %" PRIu64, stream_id);
if (LIKELY(stream))
stream->Schedule(&stream_queue_);
}
void DefaultApplication::UnscheduleStream(int64_t stream_id) {
BaseObjectPtr<QuicStream> stream = session()->FindStream(stream_id);
Debug(session(), "Unscheduling stream %" PRIu64, stream_id);
if (LIKELY(stream))
stream->Unschedule();
}
void DefaultApplication::StreamClose(
int64_t stream_id,
uint64_t app_error_code) {
if (!session()->HasStream(stream_id))
return;
if (app_error_code == 0)
app_error_code = NGTCP2_APP_NOERROR;
UnscheduleStream(stream_id);
QuicApplication::StreamClose(stream_id, app_error_code);
}
void DefaultApplication::ResumeStream(int64_t stream_id) {
Debug(session(), "Stream %" PRId64 " has data to send", stream_id);
ScheduleStream(stream_id);
}
bool DefaultApplication::ReceiveStreamData(
uint32_t flags,
int64_t stream_id,
const uint8_t* data,
size_t datalen,
uint64_t offset) {
// Ensure that the QuicStream exists before deferring to
// QuicApplication specific processing logic.
Debug(session(), "Default QUIC Application receiving stream data");
BaseObjectPtr<QuicStream> stream = session()->FindStream(stream_id);
if (!stream) {
// Shutdown the stream explicitly if the session is being closed.
if (session()->is_gracefully_closing()) {
session()->ResetStream(stream_id, NGTCP2_ERR_CLOSING);
return true;
}
// One potential DOS attack vector is to send a bunch of
// empty stream frames to commit resources. Check that
// here. Essentially, we only want to create a new stream
// if the datalen is greater than 0, otherwise, we ignore
// the packet. ngtcp2 should be handling this for us,
// but we handle it just to be safe.
if (UNLIKELY(datalen == 0))
return true;
stream = session()->CreateStream(stream_id);
}
CHECK(stream);
stream->ReceiveData(flags, data, datalen, offset);
return true;
}
int DefaultApplication::GetStreamData(StreamData* stream_data) {
QuicStream* stream = stream_queue_.PopFront();
// If stream is nullptr, there are no streams with data pending.
if (stream == nullptr)
return 0;
stream_data->stream.reset(stream);
stream_data->id = stream->id();
auto next = [&](
int status,
const ngtcp2_vec* data,
size_t count,
bob::Done done) {
switch (status) {
case bob::Status::STATUS_BLOCK:
// Fall through
case bob::Status::STATUS_WAIT:
// Fall through
case bob::Status::STATUS_EOS:
return;
case bob::Status::STATUS_END:
stream_data->fin = 1;
}
stream_data->count = count;
if (count > 0) {
stream->Schedule(&stream_queue_);
stream_data->remaining = get_length(data, count);
} else {
stream_data->remaining = 0;
}
};
if (LIKELY(!stream->is_eos())) {
CHECK_GE(stream->Pull(
std::move(next),
bob::Options::OPTIONS_SYNC,
stream_data->data,
arraysize(stream_data->data),
kMaxVectorCount), 0);
}
return 0;
}
bool DefaultApplication::StreamCommit(
StreamData* stream_data,
size_t datalen) {
CHECK(stream_data->stream);
stream_data->remaining -= datalen;
Consume(&stream_data->buf, &stream_data->count, datalen);
stream_data->stream->Commit(datalen);
return true;
}
bool DefaultApplication::ShouldSetFin(const StreamData& stream_data) {
if (!stream_data.stream ||
!IsEmpty(stream_data.buf, stream_data.count))
return false;
return !stream_data.stream->is_writable();
}
} // namespace quic
} // namespace node