diff --git a/src/stream_pipe.cc b/src/stream_pipe.cc index 94ba8604bd76c6..93b7ffeca9cd53 100644 --- a/src/stream_pipe.cc +++ b/src/stream_pipe.cc @@ -11,7 +11,10 @@ using v8::Function; using v8::FunctionCallbackInfo; using v8::FunctionTemplate; using v8::HandleScope; +using v8::Just; using v8::Local; +using v8::Maybe; +using v8::Nothing; using v8::Object; using v8::Value; @@ -28,31 +31,6 @@ StreamPipe::StreamPipe(StreamBase* source, sink->PushStreamListener(&writable_listener_); uses_wants_write_ = sink->HasWantsWrite(); - - // Set up links between this object and the source/sink objects. - // In particular, this makes sure that they are garbage collected as a group, - // if that applies to the given streams (for example, Http2Streams use - // weak references). - if (obj->Set(env()->context(), - env()->source_string(), - source->GetObject()).IsNothing()) { - return; - } - if (source->GetObject()->Set(env()->context(), - env()->pipe_target_string(), - obj).IsNothing()) { - return; - } - if (obj->Set(env()->context(), - env()->sink_string(), - sink->GetObject()).IsNothing()) { - return; - } - if (sink->GetObject()->Set(env()->context(), - env()->pipe_source_string(), - obj).IsNothing()) { - return; - } } StreamPipe::~StreamPipe() { @@ -261,6 +239,38 @@ void StreamPipe::WritableListener::OnStreamRead(ssize_t nread, return previous_listener_->OnStreamRead(nread, buf); } +Maybe StreamPipe::New(StreamBase* source, + StreamBase* sink, + Local obj) { + std::unique_ptr stream_pipe(new StreamPipe(source, sink, obj)); + + // Set up links between this object and the source/sink objects. + // In particular, this makes sure that they are garbage collected as a group, + // if that applies to the given streams (for example, Http2Streams use + // weak references). + Environment* env = source->stream_env(); + if (obj->Set(env->context(), env->source_string(), source->GetObject()) + .IsNothing()) { + return Nothing(); + } + if (source->GetObject() + ->Set(env->context(), env->pipe_target_string(), obj) + .IsNothing()) { + return Nothing(); + } + if (obj->Set(env->context(), env->sink_string(), sink->GetObject()) + .IsNothing()) { + return Nothing(); + } + if (sink->GetObject() + ->Set(env->context(), env->pipe_source_string(), obj) + .IsNothing()) { + return Nothing(); + } + + return Just(stream_pipe.release()); +} + void StreamPipe::New(const FunctionCallbackInfo& args) { CHECK(args.IsConstructCall()); CHECK(args[0]->IsObject()); @@ -268,7 +278,7 @@ void StreamPipe::New(const FunctionCallbackInfo& args) { StreamBase* source = StreamBase::FromObject(args[0].As()); StreamBase* sink = StreamBase::FromObject(args[1].As()); - new StreamPipe(source, sink, args.This()); + if (StreamPipe::New(source, sink, args.This()).IsNothing()) return; } void StreamPipe::Start(const FunctionCallbackInfo& args) { diff --git a/src/stream_pipe.h b/src/stream_pipe.h index f411468294d3f0..54ca5bfdd04792 100644 --- a/src/stream_pipe.h +++ b/src/stream_pipe.h @@ -13,10 +13,9 @@ class StreamPipe : public AsyncWrap { void Unpipe(bool is_in_deletion = false); - // TODO(RaisinTen): Just like MessagePort, add the following overload: - // static StreamPipe* New(StreamBase* source, StreamBase* sink, - // v8::Local obj); - // so that we can indicate if there is a pending exception/termination. + static v8::Maybe New(StreamBase* source, + StreamBase* sink, + v8::Local obj); static void New(const v8::FunctionCallbackInfo& args); static void Start(const v8::FunctionCallbackInfo& args); static void Unpipe(const v8::FunctionCallbackInfo& args);