Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

src: remove usage of AllocatedBuffer from stream_* #40293

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 3 additions & 4 deletions src/stream_base-inl.h
Expand Up @@ -3,7 +3,6 @@

#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

#include "allocated_buffer-inl.h"
#include "async_wrap-inl.h"
#include "base_object-inl.h"
#include "node.h"
Expand Down Expand Up @@ -270,9 +269,9 @@ ShutdownWrap* ShutdownWrap::FromObject(
return FromObject(base_obj->object());
}

void WriteWrap::SetAllocatedStorage(AllocatedBuffer&& storage) {
CHECK_NULL(storage_.data());
storage_ = std::move(storage);
void WriteWrap::SetBackingStore(std::unique_ptr<v8::BackingStore> bs) {
CHECK(!backing_store_);
backing_store_ = std::move(bs);
}

void StreamReq::Done(int status, const char* error_str) {
Expand Down
89 changes: 49 additions & 40 deletions src/stream_base.cc
Expand Up @@ -19,6 +19,7 @@ namespace node {

using v8::Array;
using v8::ArrayBuffer;
using v8::BackingStore;
using v8::ConstructorBehavior;
using v8::Context;
using v8::DontDelete;
Expand All @@ -29,6 +30,7 @@ using v8::FunctionCallbackInfo;
using v8::FunctionTemplate;
using v8::HandleScope;
using v8::Integer;
using v8::Isolate;
using v8::Local;
using v8::MaybeLocal;
using v8::Object;
Expand Down Expand Up @@ -80,6 +82,8 @@ void StreamBase::SetWriteResult(const StreamWriteResult& res) {

int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
Isolate* isolate = env->isolate();
Local<Context> context = env->context();

CHECK(args[0]->IsObject());
CHECK(args[1]->IsArray());
Expand All @@ -102,21 +106,21 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
if (!all_buffers) {
// Determine storage size first
for (size_t i = 0; i < count; i++) {
Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();
Local<Value> chunk = chunks->Get(context, i * 2).ToLocalChecked();
RaisinTen marked this conversation as resolved.
Show resolved Hide resolved

if (Buffer::HasInstance(chunk))
continue;
// Buffer chunk, no additional storage required

// String chunk
Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
enum encoding encoding = ParseEncoding(env->isolate(),
chunks->Get(env->context(), i * 2 + 1).ToLocalChecked());
Local<String> string = chunk->ToString(context).ToLocalChecked();
enum encoding encoding = ParseEncoding(isolate,
chunks->Get(context, i * 2 + 1).ToLocalChecked());
size_t chunk_size;
if (encoding == UTF8 && string->Length() > 65535 &&
!StringBytes::Size(env->isolate(), string, encoding).To(&chunk_size))
!StringBytes::Size(isolate, string, encoding).To(&chunk_size))
return 0;
else if (!StringBytes::StorageSize(env->isolate(), string, encoding)
else if (!StringBytes::StorageSize(isolate, string, encoding)
.To(&chunk_size))
return 0;
storage_size += chunk_size;
Expand All @@ -126,20 +130,22 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
return UV_ENOBUFS;
} else {
for (size_t i = 0; i < count; i++) {
Local<Value> chunk = chunks->Get(env->context(), i).ToLocalChecked();
Local<Value> chunk = chunks->Get(context, i).ToLocalChecked();
bufs[i].base = Buffer::Data(chunk);
bufs[i].len = Buffer::Length(chunk);
}
}

AllocatedBuffer storage;
if (storage_size > 0)
storage = AllocatedBuffer::AllocateManaged(env, storage_size);
std::unique_ptr<BackingStore> bs;
if (storage_size > 0) {
NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data());
bs = ArrayBuffer::NewBackingStore(isolate, storage_size);
}

offset = 0;
if (!all_buffers) {
for (size_t i = 0; i < count; i++) {
Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();
Local<Value> chunk = chunks->Get(context, i * 2).ToLocalChecked();

// Write buffer
if (Buffer::HasInstance(chunk)) {
Expand All @@ -150,13 +156,14 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {

// Write string
CHECK_LE(offset, storage_size);
char* str_storage = storage.data() + offset;
size_t str_size = storage.size() - offset;

Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
enum encoding encoding = ParseEncoding(env->isolate(),
chunks->Get(env->context(), i * 2 + 1).ToLocalChecked());
str_size = StringBytes::Write(env->isolate(),
char* str_storage =
static_cast<char*>(bs ? bs->Data() : nullptr) + offset;
size_t str_size = (bs ? bs->ByteLength() : 0) - offset;

Local<String> string = chunk->ToString(context).ToLocalChecked();
enum encoding encoding = ParseEncoding(isolate,
chunks->Get(context, i * 2 + 1).ToLocalChecked());
str_size = StringBytes::Write(isolate,
str_storage,
str_size,
string,
Expand All @@ -169,9 +176,8 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {

StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
SetWriteResult(res);
if (res.wrap != nullptr && storage_size > 0) {
res.wrap->SetAllocatedStorage(std::move(storage));
}
if (res.wrap != nullptr && storage_size > 0)
res.wrap->SetBackingStore(std::move(bs));
return res.err;
}

Expand Down Expand Up @@ -216,6 +222,7 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
template <enum encoding enc>
int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
Isolate* isolate = env->isolate();
CHECK(args[0]->IsObject());
CHECK(args[1]->IsString());

Expand All @@ -230,9 +237,9 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
// computing their actual size, rather than tripling the storage.
size_t storage_size;
if (enc == UTF8 && string->Length() > 65535 &&
!StringBytes::Size(env->isolate(), string, enc).To(&storage_size))
!StringBytes::Size(isolate, string, enc).To(&storage_size))
return 0;
else if (!StringBytes::StorageSize(env->isolate(), string, enc)
else if (!StringBytes::StorageSize(isolate, string, enc)
.To(&storage_size))
return 0;

Expand All @@ -248,7 +255,7 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
bool try_write = storage_size <= sizeof(stack_storage) &&
(!IsIPCPipe() || send_handle_obj.IsEmpty());
if (try_write) {
data_size = StringBytes::Write(env->isolate(),
data_size = StringBytes::Write(isolate,
stack_storage,
storage_size,
string,
Expand All @@ -274,26 +281,28 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
CHECK_EQ(count, 1);
}

AllocatedBuffer data;
std::unique_ptr<BackingStore> bs;

if (try_write) {
// Copy partial data
data = AllocatedBuffer::AllocateManaged(env, buf.len);
memcpy(data.data(), buf.base, buf.len);
NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data());
bs = ArrayBuffer::NewBackingStore(isolate, buf.len);
memcpy(static_cast<char*>(bs->Data()), buf.base, buf.len);
data_size = buf.len;
} else {
// Write it
data = AllocatedBuffer::AllocateManaged(env, storage_size);
data_size = StringBytes::Write(env->isolate(),
data.data(),
NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data());
bs = ArrayBuffer::NewBackingStore(isolate, storage_size);
data_size = StringBytes::Write(isolate,
static_cast<char*>(bs->Data()),
storage_size,
string,
enc);
}

CHECK_LE(data_size, storage_size);

buf = uv_buf_init(data.data(), data_size);
buf = uv_buf_init(static_cast<char*>(bs->Data()), data_size);

uv_stream_t* send_handle = nullptr;

Expand All @@ -312,9 +321,8 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
res.bytes += synchronously_written;

SetWriteResult(res);
if (res.wrap != nullptr) {
res.wrap->SetAllocatedStorage(std::move(data));
}
if (res.wrap != nullptr)
res.wrap->SetBackingStore(std::move(bs));

return res.err;
}
Expand Down Expand Up @@ -511,27 +519,28 @@ void StreamResource::ClearError() {
uv_buf_t EmitToJSStreamListener::OnStreamAlloc(size_t suggested_size) {
CHECK_NOT_NULL(stream_);
Environment* env = static_cast<StreamBase*>(stream_)->stream_env();
return AllocatedBuffer::AllocateManaged(env, suggested_size).release();
return env->allocate_managed_buffer(suggested_size);
}

void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
CHECK_NOT_NULL(stream_);
StreamBase* stream = static_cast<StreamBase*>(stream_);
Environment* env = stream->stream_env();
HandleScope handle_scope(env->isolate());
Isolate* isolate = env->isolate();
HandleScope handle_scope(isolate);
Context::Scope context_scope(env->context());
AllocatedBuffer buf(env, buf_);
std::unique_ptr<BackingStore> bs = env->release_managed_buffer(buf_);

if (nread <= 0) {
if (nread < 0)
stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
return;
}

CHECK_LE(static_cast<size_t>(nread), buf.size());
buf.Resize(nread);
CHECK_LE(static_cast<size_t>(nread), bs->ByteLength());
bs = BackingStore::Reallocate(isolate, std::move(bs), nread);

stream->CallJSOnreadMethod(nread, buf.ToArrayBuffer());
stream->CallJSOnreadMethod(nread, ArrayBuffer::New(isolate, std::move(bs)));
}


Expand Down
5 changes: 2 additions & 3 deletions src/stream_base.h
Expand Up @@ -4,7 +4,6 @@
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

#include "env.h"
#include "allocated_buffer.h"
#include "async_wrap.h"
#include "node.h"
#include "util.h"
Expand Down Expand Up @@ -90,7 +89,7 @@ class ShutdownWrap : public StreamReq {

class WriteWrap : public StreamReq {
public:
inline void SetAllocatedStorage(AllocatedBuffer&& storage);
inline void SetBackingStore(std::unique_ptr<v8::BackingStore> bs);

inline WriteWrap(
StreamBase* stream,
Expand All @@ -105,7 +104,7 @@ class WriteWrap : public StreamReq {
void OnDone(int status) override;

private:
AllocatedBuffer storage_;
std::unique_ptr<v8::BackingStore> backing_store_;
};


Expand Down
15 changes: 8 additions & 7 deletions src/stream_pipe.cc
@@ -1,11 +1,11 @@
#include "stream_pipe.h"
#include "allocated_buffer-inl.h"
#include "stream_base-inl.h"
#include "node_buffer.h"
#include "util-inl.h"

namespace node {

using v8::BackingStore;
using v8::Context;
using v8::Function;
using v8::FunctionCallbackInfo;
Expand Down Expand Up @@ -118,13 +118,13 @@ uv_buf_t StreamPipe::ReadableListener::OnStreamAlloc(size_t suggested_size) {
StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
size_t size = std::min(suggested_size, pipe->wanted_data_);
CHECK_GT(size, 0);
return AllocatedBuffer::AllocateManaged(pipe->env(), size).release();
return pipe->env()->allocate_managed_buffer(size);
}

void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
const uv_buf_t& buf_) {
StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
AllocatedBuffer buf(pipe->env(), buf_);
std::unique_ptr<BackingStore> bs = pipe->env()->release_managed_buffer(buf_);
if (nread < 0) {
// EOF or error; stop reading and pass the error to the previous listener
// (which might end up in JS).
Expand All @@ -144,19 +144,20 @@ void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
return;
}

pipe->ProcessData(nread, std::move(buf));
pipe->ProcessData(nread, std::move(bs));
}

void StreamPipe::ProcessData(size_t nread, AllocatedBuffer&& buf) {
void StreamPipe::ProcessData(size_t nread,
std::unique_ptr<BackingStore> bs) {
CHECK(uses_wants_write_ || pending_writes_ == 0);
uv_buf_t buffer = uv_buf_init(buf.data(), nread);
uv_buf_t buffer = uv_buf_init(static_cast<char*>(bs->Data()), nread);
StreamWriteResult res = sink()->Write(&buffer, 1);
pending_writes_++;
if (!res.async) {
writable_listener_.OnStreamAfterWrite(nullptr, res.err);
} else {
is_reading_ = false;
res.wrap->SetAllocatedStorage(std::move(buf));
res.wrap->SetBackingStore(std::move(bs));
if (source() != nullptr)
source()->ReadStop();
}
Expand Down
3 changes: 1 addition & 2 deletions src/stream_pipe.h
Expand Up @@ -4,7 +4,6 @@
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

#include "stream_base.h"
#include "allocated_buffer.h"

namespace node {

Expand Down Expand Up @@ -43,7 +42,7 @@ class StreamPipe : public AsyncWrap {
// `OnStreamWantsWrite()` support.
size_t wanted_data_ = 0;

void ProcessData(size_t nread, AllocatedBuffer&& buf);
void ProcessData(size_t nread, std::unique_ptr<v8::BackingStore> bs);

class ReadableListener : public StreamListener {
public:
Expand Down