Skip to content

Commit

Permalink
src,stream: remove *Check*() calls from non-Initialize() functions
Browse files Browse the repository at this point in the history
There is no need to crash the process if any of these checks fail.

Signed-off-by: Darshan Sen <darshan.sen@postman.com>

PR-URL: #40425
Reviewed-By: Anna Henningsen <anna@addaleax.net>
  • Loading branch information
RaisinTen authored and BethGriggs committed Nov 23, 2021
1 parent 63ab003 commit 3ac99a2
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 53 deletions.
26 changes: 16 additions & 10 deletions src/stream_base-inl.h
Expand Up @@ -149,9 +149,11 @@ int StreamBase::Shutdown(v8::Local<v8::Object> req_wrap_obj) {

const char* msg = Error();
if (msg != nullptr) {
req_wrap_obj->Set(
env->context(),
env->error_string(), OneByteString(env->isolate(), msg)).Check();
if (req_wrap_obj->Set(env->context(),
env->error_string(),
OneByteString(env->isolate(), msg)).IsNothing()) {
return UV_EBUSY;
}
ClearError();
}

Expand Down Expand Up @@ -203,9 +205,11 @@ StreamWriteResult StreamBase::Write(

const char* msg = Error();
if (msg != nullptr) {
req_wrap_obj->Set(env->context(),
env->error_string(),
OneByteString(env->isolate(), msg)).Check();
if (req_wrap_obj->Set(env->context(),
env->error_string(),
OneByteString(env->isolate(), msg)).IsNothing()) {
return StreamWriteResult { false, UV_EBUSY, nullptr, 0, {} };
}
ClearError();
}

Expand Down Expand Up @@ -279,10 +283,12 @@ void StreamReq::Done(int status, const char* error_str) {
Environment* env = async_wrap->env();
if (error_str != nullptr) {
v8::HandleScope handle_scope(env->isolate());
async_wrap->object()->Set(env->context(),
env->error_string(),
OneByteString(env->isolate(), error_str))
.Check();
if (async_wrap->object()->Set(
env->context(),
env->error_string(),
OneByteString(env->isolate(), error_str)).IsNothing()) {
return;
}
}

OnDone(status);
Expand Down
73 changes: 46 additions & 27 deletions src/stream_base.cc
Expand Up @@ -106,31 +106,40 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
if (!all_buffers) {
// Determine storage size first
for (size_t i = 0; i < count; i++) {
Local<Value> chunk = chunks->Get(context, i * 2).ToLocalChecked();
Local<Value> chunk;
if (!chunks->Get(context, i * 2).ToLocal(&chunk))
return -1;

if (Buffer::HasInstance(chunk))
continue;
// Buffer chunk, no additional storage required

// String chunk
Local<String> string = chunk->ToString(context).ToLocalChecked();
enum encoding encoding = ParseEncoding(isolate,
chunks->Get(context, i * 2 + 1).ToLocalChecked());
Local<String> string;
if (!chunk->ToString(context).ToLocal(&string))
return -1;
Local<Value> next_chunk;
if (!chunks->Get(context, i * 2 + 1).ToLocal(&next_chunk))
return -1;
enum encoding encoding = ParseEncoding(isolate, next_chunk);
size_t chunk_size;
if (encoding == UTF8 && string->Length() > 65535 &&
!StringBytes::Size(isolate, string, encoding).To(&chunk_size))
return 0;
else if (!StringBytes::StorageSize(isolate, string, encoding)
.To(&chunk_size))
return 0;
if ((encoding == UTF8 &&
string->Length() > 65535 &&
!StringBytes::Size(isolate, string, encoding).To(&chunk_size)) ||
!StringBytes::StorageSize(isolate, string, encoding)
.To(&chunk_size)) {
return -1;
}
storage_size += chunk_size;
}

if (storage_size > INT_MAX)
return UV_ENOBUFS;
} else {
for (size_t i = 0; i < count; i++) {
Local<Value> chunk = chunks->Get(context, i).ToLocalChecked();
Local<Value> chunk;
if (!chunks->Get(context, i).ToLocal(&chunk))
return -1;
bufs[i].base = Buffer::Data(chunk);
bufs[i].len = Buffer::Length(chunk);
}
Expand All @@ -145,7 +154,9 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
offset = 0;
if (!all_buffers) {
for (size_t i = 0; i < count; i++) {
Local<Value> chunk = chunks->Get(context, i * 2).ToLocalChecked();
Local<Value> chunk;
if (!chunks->Get(context, i * 2).ToLocal(&chunk))
return -1;

// Write buffer
if (Buffer::HasInstance(chunk)) {
Expand All @@ -160,9 +171,13 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
static_cast<char*>(bs ? bs->Data() : nullptr) + offset;
size_t str_size = (bs ? bs->ByteLength() : 0) - offset;

Local<String> string = chunk->ToString(context).ToLocalChecked();
enum encoding encoding = ParseEncoding(isolate,
chunks->Get(context, i * 2 + 1).ToLocalChecked());
Local<String> string;
if (!chunk->ToString(context).ToLocal(&string))
return -1;
Local<Value> next_chunk;
if (!chunks->Get(context, i * 2 + 1).ToLocal(&next_chunk))
return -1;
enum encoding encoding = ParseEncoding(isolate, next_chunk);
str_size = StringBytes::Write(isolate,
str_storage,
str_size,
Expand Down Expand Up @@ -207,9 +222,11 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
// Reference LibuvStreamWrap instance to prevent it from being garbage
// collected before `AfterWrite` is called.
req_wrap_obj->Set(env->context(),
env->handle_string(),
send_handle_obj).Check();
if (req_wrap_obj->Set(env->context(),
env->handle_string(),
send_handle_obj).IsNothing()) {
return -1;
}
}

StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
Expand All @@ -236,12 +253,12 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
// For UTF8 strings that are very long, go ahead and take the hit for
// computing their actual size, rather than tripling the storage.
size_t storage_size;
if (enc == UTF8 && string->Length() > 65535 &&
!StringBytes::Size(isolate, string, enc).To(&storage_size))
return 0;
else if (!StringBytes::StorageSize(isolate, string, enc)
.To(&storage_size))
return 0;
if ((enc == UTF8 &&
string->Length() > 65535 &&
!StringBytes::Size(isolate, string, enc).To(&storage_size)) ||
!StringBytes::StorageSize(isolate, string, enc).To(&storage_size)) {
return -1;
}

if (storage_size > INT_MAX)
return UV_ENOBUFS;
Expand Down Expand Up @@ -312,9 +329,11 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
// Reference LibuvStreamWrap instance to prevent it from being garbage
// collected before `AfterWrite` is called.
req_wrap_obj->Set(env->context(),
env->handle_string(),
send_handle_obj).Check();
if (req_wrap_obj->Set(env->context(),
env->handle_string(),
send_handle_obj).IsNothing()) {
return -1;
}
}

StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
Expand Down
6 changes: 6 additions & 0 deletions src/stream_base.h
Expand Up @@ -49,6 +49,8 @@ class StreamReq {
virtual AsyncWrap* GetAsyncWrap() = 0;
inline v8::Local<v8::Object> object();

// TODO(RaisinTen): Update the return type to a Maybe, so that we can indicate
// if there is a pending exception/termination.
inline void Done(int status, const char* error_str = nullptr);
inline void Dispose();

Expand Down Expand Up @@ -326,13 +328,17 @@ class StreamBase : public StreamResource {
// subclasses are also `BaseObject`s.
Environment* stream_env() const { return env_; }

// TODO(RaisinTen): Update the return type to a Maybe, so that we can indicate
// if there is a pending exception/termination.
// Shut down the current stream. This request can use an existing
// ShutdownWrap object (that was created in JS), or a new one will be created.
// Returns 1 in case of a synchronous completion, 0 in case of asynchronous
// completion, and a libuv error case in case of synchronous failure.
inline int Shutdown(
v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>());

// TODO(RaisinTen): Update the return type to a Maybe, so that we can indicate
// if there is a pending exception/termination.
// Write data to the current stream. This request can use an existing
// WriteWrap object (that was created in JS), or a new one will be created.
// This will first try to write synchronously using `DoTryWrite()`, then
Expand Down
31 changes: 22 additions & 9 deletions src/stream_pipe.cc
Expand Up @@ -33,14 +33,26 @@ StreamPipe::StreamPipe(StreamBase* source,
// In particular, this makes sure that they are garbage collected as a group,
// if that applies to the given streams (for example, Http2Streams use
// weak references).
obj->Set(env()->context(), env()->source_string(), source->GetObject())
.Check();
source->GetObject()->Set(env()->context(), env()->pipe_target_string(), obj)
.Check();
obj->Set(env()->context(), env()->sink_string(), sink->GetObject())
.Check();
sink->GetObject()->Set(env()->context(), env()->pipe_source_string(), obj)
.Check();
if (obj->Set(env()->context(),
env()->source_string(),
source->GetObject()).IsNothing()) {
return;
}
if (source->GetObject()->Set(env()->context(),
env()->pipe_target_string(),
obj).IsNothing()) {
return;
}
if (obj->Set(env()->context(),
env()->sink_string(),
sink->GetObject()).IsNothing()) {
return;
}
if (sink->GetObject()->Set(env()->context(),
env()->pipe_source_string(),
obj).IsNothing()) {
return;
}
}

StreamPipe::~StreamPipe() {
Expand Down Expand Up @@ -172,7 +184,8 @@ void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
Environment* env = pipe->env();
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());
pipe->MakeCallback(env->oncomplete_string(), 0, nullptr).ToLocalChecked();
if (pipe->MakeCallback(env->oncomplete_string(), 0, nullptr).IsEmpty())
return;
stream()->RemoveStreamListener(this);
}
return;
Expand Down
7 changes: 6 additions & 1 deletion src/stream_pipe.h
Expand Up @@ -9,11 +9,14 @@ namespace node {

class StreamPipe : public AsyncWrap {
public:
StreamPipe(StreamBase* source, StreamBase* sink, v8::Local<v8::Object> obj);
~StreamPipe() override;

void Unpipe(bool is_in_deletion = false);

// TODO(RaisinTen): Just like MessagePort, add the following overload:
// static StreamPipe* New(StreamBase* source, StreamBase* sink,
// v8::Local<v8::Object> obj);
// so that we can indicate if there is a pending exception/termination.
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Start(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Unpipe(const v8::FunctionCallbackInfo<v8::Value>& args);
Expand All @@ -25,6 +28,8 @@ class StreamPipe : public AsyncWrap {
SET_SELF_SIZE(StreamPipe)

private:
StreamPipe(StreamBase* source, StreamBase* sink, v8::Local<v8::Object> obj);

inline StreamBase* source();
inline StreamBase* sink();

Expand Down
12 changes: 6 additions & 6 deletions src/stream_wrap.cc
Expand Up @@ -268,12 +268,12 @@ void LibuvStreamWrap::OnUvRead(ssize_t nread, const uv_buf_t* buf) {
CHECK_EQ(type, UV_UNKNOWN_HANDLE);
}

if (!pending_obj.IsEmpty()) {
object()
->Set(env()->context(),
env()->pending_handle_string(),
pending_obj.ToLocalChecked())
.Check();
Local<Object> local_pending_obj;
if (pending_obj.ToLocal(&local_pending_obj) &&
object()->Set(env()->context(),
env()->pending_handle_string(),
local_pending_obj).IsNothing()) {
return;
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/stream_wrap.h
Expand Up @@ -105,6 +105,8 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase {

// Callbacks for libuv
void OnUvAlloc(size_t suggested_size, uv_buf_t* buf);
// TODO(RaisinTen): Update the return type to a Maybe, so that we can indicate
// if there is a pending exception/termination.
void OnUvRead(ssize_t nread, const uv_buf_t* buf);

static void AfterUvWrite(uv_write_t* req, int status);
Expand Down

0 comments on commit 3ac99a2

Please sign in to comment.