@@ -675,7 +675,7 @@ Http2Session::Http2Session(Environment* env,
675
675
ArrayBuffer::New (env->isolate (), js_fields_, kSessionUint8FieldCount );
676
676
Local<Uint8Array> uint8_arr =
677
677
Uint8Array::New (ab, 0 , kSessionUint8FieldCount );
678
- /* USE*/ (wrap->Set (env->context (), env->fields_string (), uint8_arr));
678
+ USE (wrap->Set (env->context (), env->fields_string (), uint8_arr));
679
679
}
680
680
}
681
681
@@ -702,6 +702,7 @@ Http2Session::~Http2Session() {
702
702
DEBUG_HTTP2SESSION (this , " freeing nghttp2 session" );
703
703
nghttp2_session_del (session_);
704
704
CHECK_EQ (current_nghttp2_memory_, 0 );
705
+ free (stream_buf_allocation_.base );
705
706
}
706
707
707
708
inline bool HasHttp2Observer (Environment* env) {
@@ -1211,18 +1212,31 @@ inline int Http2Session::OnDataChunkReceived(nghttp2_session* handle,
1211
1212
1212
1213
stream->statistics_ .received_bytes += len;
1213
1214
1215
+ Local<ArrayBuffer> ab;
1216
+ if (session->stream_buf_ab_ .IsEmpty ()) {
1217
+ ab = ArrayBuffer::New (env->isolate (),
1218
+ session->stream_buf_allocation_ .base ,
1219
+ session->stream_buf_allocation_ .len ,
1220
+ v8::ArrayBufferCreationMode::kInternalized );
1221
+ session->stream_buf_allocation_ = uv_buf_init (nullptr , 0 );
1222
+ session->stream_buf_ab_ .Reset (env->isolate (), ab);
1223
+ } else {
1224
+ ab = session->stream_buf_ab_ .Get (env->isolate ());
1225
+ }
1226
+
1214
1227
// There is a single large array buffer for the entire data read from the
1215
1228
// network; create a slice of that array buffer and emit it as the
1216
1229
// received data buffer.
1217
- CHECK (! session->stream_buf_ab_ . IsEmpty () );
1218
- size_t offset = reinterpret_cast < const char *>(data) - session-> stream_buf_ ;
1230
+ size_t offset = data - reinterpret_cast < uint8_t *>( session->stream_buf_ . base );
1231
+
1219
1232
// Verify that the data offset is inside the current read buffer.
1220
- CHECK_LE (offset, session->stream_buf_size_ );
1233
+ CHECK_LE (offset, session->stream_buf_ .len );
1234
+ CHECK_LE (offset + len, session->stream_buf_ .len );
1221
1235
1222
- Local<Object> buf =
1223
- Buffer::New (env, session-> stream_buf_ab_ , offset, len).ToLocalChecked ();
1236
+ Local<Object> buffer =
1237
+ Buffer::New (env, ab , offset, len).ToLocalChecked ();
1224
1238
1225
- stream->EmitData (len, buf , Local<Object>());
1239
+ stream->EmitData (len, buffer , Local<Object>());
1226
1240
if (!stream->IsReading ())
1227
1241
stream->inbound_consumed_data_while_paused_ += len;
1228
1242
else
@@ -1841,83 +1855,82 @@ void Http2Session::OnStreamAllocImpl(size_t suggested_size,
1841
1855
uv_buf_t * buf,
1842
1856
void * ctx) {
1843
1857
Http2Session* session = static_cast <Http2Session*>(ctx);
1844
- CHECK_EQ (session->stream_buf_ , nullptr );
1845
- CHECK_EQ (session->stream_buf_size_ , 0 );
1846
- buf->base = session->stream_buf_ = Malloc (suggested_size);
1847
- buf->len = session->stream_buf_size_ = suggested_size;
1848
- session->IncrementCurrentSessionMemory (suggested_size);
1858
+ CHECK_EQ (session->stream_buf_ .base , nullptr );
1859
+ CHECK_EQ (session->stream_buf_ .len , 0 );
1860
+ *buf = uv_buf_init (Malloc (suggested_size), suggested_size);
1849
1861
}
1850
1862
1851
1863
// Callback used to receive inbound data from the i/o stream
1852
1864
void Http2Session::OnStreamReadImpl (ssize_t nread,
1853
- const uv_buf_t * buf ,
1865
+ const uv_buf_t * buf_ ,
1854
1866
uv_handle_type pending,
1855
1867
void * ctx) {
1856
1868
Http2Session* session = static_cast <Http2Session*>(ctx);
1857
1869
Http2Scope h2scope (session);
1858
1870
CHECK_NE (session->stream_ , nullptr );
1859
1871
DEBUG_HTTP2SESSION2 (session, " receiving %d bytes" , nread);
1872
+ CHECK_EQ (session->stream_buf_allocation_ .base , nullptr );
1873
+ CHECK (session->stream_buf_ab_ .IsEmpty ());
1874
+
1875
+ // Only pass data on if nread > 0
1860
1876
if (nread <= 0 ) {
1861
- free (session->stream_buf_ );
1877
+ if (buf_ != nullptr )
1878
+ free (buf_->base );
1862
1879
if (nread < 0 ) {
1863
1880
uv_buf_t tmp_buf = uv_buf_init (nullptr , 0 );
1864
1881
session->prev_read_cb_ .fn (nread,
1865
1882
&tmp_buf,
1866
1883
pending,
1867
1884
session->prev_read_cb_ .ctx );
1868
1885
}
1869
- } else {
1870
- // Only pass data on if nread > 0
1871
-
1872
- // Verify that currently: There is memory allocated into which
1873
- // the data has been read, and that memory buffer is at least as large
1874
- // as the amount of data we have read, but we have not yet made an
1875
- // ArrayBuffer out of it.
1876
- CHECK_NE (session->stream_buf_ , nullptr );
1877
- CHECK_EQ (session->stream_buf_ , buf->base );
1878
- CHECK_EQ (session->stream_buf_size_ , buf->len );
1879
- CHECK_GE (session->stream_buf_size_ , static_cast <size_t >(nread));
1880
- CHECK (session->stream_buf_ab_ .IsEmpty ());
1886
+ return ;
1887
+ }
1881
1888
1882
- Environment* env = session->env ();
1883
- Isolate* isolate = env->isolate ();
1884
- HandleScope scope (isolate);
1885
- Local<Context> context = env->context ();
1886
- Context::Scope context_scope (context);
1889
+ // Shrink to the actual amount of used data.
1890
+ uv_buf_t buf = *buf_;
1891
+ buf.base = Realloc (buf.base , nread);
1892
+
1893
+ session->IncrementCurrentSessionMemory (nread);
1894
+ OnScopeLeave on_scope_leave ([&]() {
1895
+ // Once finished handling this write, reset the stream buffer.
1896
+ // The memory has either been free()d or was handed over to V8.
1897
+ // We use `nread` instead of `buf.size()` here, because the buffer is
1898
+ // cleared as part of the `.ToArrayBuffer()` call below.
1899
+ session->DecrementCurrentSessionMemory (nread);
1900
+ session->stream_buf_ab_ .Reset ();
1901
+ free (session->stream_buf_allocation_ .base );
1902
+ session->stream_buf_allocation_ = uv_buf_init (nullptr , 0 );
1903
+ session->stream_buf_ = uv_buf_init (nullptr , 0 );
1904
+ });
1887
1905
1888
- // Create an array buffer for the read data. DATA frames will be emitted
1889
- // as slices of this array buffer to avoid having to copy memory.
1890
- session->stream_buf_ab_ =
1891
- ArrayBuffer::New (isolate,
1892
- session->stream_buf_ ,
1893
- session->stream_buf_size_ ,
1894
- v8::ArrayBufferCreationMode::kInternalized );
1895
-
1896
- uv_buf_t buf_ = uv_buf_init (buf->base , nread);
1897
- session->statistics_ .data_received += nread;
1898
- ssize_t ret = session->Write (&buf_, 1 );
1899
-
1900
- // Note: if ssize_t is not defined (e.g. on Win32), nghttp2 will typedef
1901
- // ssize_t to int. Cast here so that the < 0 check actually works on
1902
- // Windows.
1903
- if (static_cast <int >(ret) < 0 ) {
1904
- DEBUG_HTTP2SESSION2 (session, " fatal error receiving data: %d" , ret);
1905
-
1906
- Local<Value> argv[1 ] = {
1907
- Integer::New (isolate, ret),
1908
- };
1909
- session->MakeCallback (env->error_string (), arraysize (argv), argv);
1910
- } else {
1911
- session->MaybeStopReading ();
1912
- }
1913
- }
1906
+ // Make sure that there was no read previously active.
1907
+ CHECK_EQ (session->stream_buf_ .base , nullptr );
1908
+ CHECK_EQ (session->stream_buf_ .len , 0 );
1909
+
1910
+ // Remember the current buffer, so that OnDataChunkReceived knows the
1911
+ // offset of a DATA frame's data into the socket read buffer.
1912
+ session->stream_buf_ = uv_buf_init (buf.base , nread);
1914
1913
1915
- // Since we are finished handling this write, reset the stream buffer.
1916
- // The memory has either been free()d or was handed over to V8.
1917
- session->DecrementCurrentSessionMemory (session->stream_buf_size_ );
1918
- session->stream_buf_ = nullptr ;
1919
- session->stream_buf_size_ = 0 ;
1920
- session->stream_buf_ab_ = Local<ArrayBuffer>();
1914
+ // Verify that currently: There is memory allocated into which
1915
+ // the data has been read, and that memory buffer is at least as large
1916
+ // as the amount of data we have read, but we have not yet made an
1917
+ // ArrayBuffer out of it.
1918
+ CHECK_LE (static_cast <size_t >(nread), session->stream_buf_ .len );
1919
+
1920
+ // Store this so we can create an ArrayBuffer for read data from it.
1921
+ // DATA frames will be emitted as slices of that ArrayBuffer to avoid having
1922
+ // to copy memory.
1923
+ session->stream_buf_allocation_ = buf;
1924
+
1925
+ session->statistics_ .data_received += nread;
1926
+ ssize_t ret = session->Write (&session->stream_buf_ , 1 );
1927
+
1928
+ if (UNLIKELY (ret < 0 )) {
1929
+ DEBUG_HTTP2SESSION2 (session, " fatal error receiving data: %d" , ret);
1930
+ Local<Value> arg = Integer::New (session->env ()->isolate (), ret);
1931
+ session->MakeCallback (session->env ()->error_string (), 1 , &arg);
1932
+ return ;
1933
+ }
1921
1934
}
1922
1935
1923
1936
void Http2Session::OnStreamDestructImpl (void * ctx) {
0 commit comments