Skip to content

Commit 28cb7e7

Browse files
addaleaxtargos
authored andcommittedApr 18, 2020
worker: improve MessagePort performance
Use a JS function as the single entry point for emitting `.onmessage()` calls, avoiding the overhead of manually constructing each message event object in C++. confidence improvement accuracy (*) (**) (***) worker/echo.js n=100000 sendsPerBroadcast=1 payload='object' workers=1 *** 16.34 % ±1.16% ±1.54% ±1.99% worker/echo.js n=100000 sendsPerBroadcast=1 payload='string' workers=1 *** 24.41 % ±1.50% ±1.99% ±2.58% worker/echo.js n=100000 sendsPerBroadcast=10 payload='object' workers=1 *** 26.66 % ±1.54% ±2.05% ±2.65% worker/echo.js n=100000 sendsPerBroadcast=10 payload='string' workers=1 *** 32.72 % ±1.60% ±2.11% ±2.73% worker/messageport.js n=1000000 payload='object' *** 40.28 % ±1.48% ±1.95% ±2.52% worker/messageport.js n=1000000 payload='string' *** 76.95 % ±2.19% ±2.90% ±3.75% Also fix handling exceptions returned from `MessagePort::New`. PR-URL: #31605 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Denys Otrishko <shishugi@gmail.com> Reviewed-By: Rich Trott <rtrott@gmail.com>
1 parent 3bf21b0 commit 28cb7e7

File tree

6 files changed

+61
-24
lines changed

6 files changed

+61
-24
lines changed
 
+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
'use strict';
2+
class MessageEvent {
3+
constructor(data, target) {
4+
this.data = data;
5+
this.target = target;
6+
}
7+
}
8+
9+
exports.emitMessage = function(data) {
10+
if (typeof this.onmessage === 'function')
11+
this.onmessage(new MessageEvent(data, this));
12+
};

‎node.gyp

+1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
'lib/internal/bootstrap/switches/is_not_main_thread.js',
3636
'lib/internal/per_context/primordials.js',
3737
'lib/internal/per_context/domexception.js',
38+
'lib/internal/per_context/messageport.js',
3839
'lib/async_hooks.js',
3940
'lib/assert.js',
4041
'lib/buffer.js',

‎src/api/environment.cc

+1
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,7 @@ bool InitializeContextForSnapshot(Local<Context> context) {
433433

434434
static const char* context_files[] = {"internal/per_context/primordials",
435435
"internal/per_context/domexception",
436+
"internal/per_context/messageport",
436437
nullptr};
437438

438439
for (const char** module = context_files; *module != nullptr; module++) {

‎src/env.h

-1
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,6 @@ constexpr size_t kFsStatsBufferLength =
413413
V(http2stream_constructor_template, v8::ObjectTemplate) \
414414
V(http2ping_constructor_template, v8::ObjectTemplate) \
415415
V(libuv_stream_wrap_ctor_template, v8::FunctionTemplate) \
416-
V(message_event_object_template, v8::ObjectTemplate) \
417416
V(message_port_constructor_template, v8::FunctionTemplate) \
418417
V(pipe_constructor_template, v8::FunctionTemplate) \
419418
V(promise_wrap_template, v8::ObjectTemplate) \

‎src/node_messaging.cc

+41-22
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ using v8::Maybe;
2828
using v8::MaybeLocal;
2929
using v8::Nothing;
3030
using v8::Object;
31-
using v8::ObjectTemplate;
3231
using v8::SharedArrayBuffer;
3332
using v8::String;
3433
using v8::Symbol;
@@ -188,6 +187,20 @@ uint32_t Message::AddWASMModule(WasmModuleObject::TransferrableModule&& mod) {
188187

189188
namespace {
190189

190+
MaybeLocal<Function> GetEmitMessageFunction(Local<Context> context) {
191+
Isolate* isolate = context->GetIsolate();
192+
Local<Object> per_context_bindings;
193+
Local<Value> emit_message_val;
194+
if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
195+
!per_context_bindings->Get(context,
196+
FIXED_ONE_BYTE_STRING(isolate, "emitMessage"))
197+
.ToLocal(&emit_message_val)) {
198+
return MaybeLocal<Function>();
199+
}
200+
CHECK(emit_message_val->IsFunction());
201+
return emit_message_val.As<Function>();
202+
}
203+
191204
MaybeLocal<Function> GetDOMException(Local<Context> context) {
192205
Isolate* isolate = context->GetIsolate();
193206
Local<Object> per_context_bindings;
@@ -498,20 +511,31 @@ MessagePort::MessagePort(Environment* env,
498511
MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
499512
channel->OnMessage();
500513
};
514+
501515
CHECK_EQ(uv_async_init(env->event_loop(),
502516
&async_,
503517
onmessage), 0);
504-
async_.data = static_cast<void*>(this);
518+
async_.data = nullptr; // Reset later to indicate success of the constructor.
519+
auto cleanup = OnScopeLeave([&]() {
520+
if (async_.data == nullptr) Close();
521+
});
505522

506523
Local<Value> fn;
507524
if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
508525
return;
509526

510527
if (fn->IsFunction()) {
511528
Local<Function> init = fn.As<Function>();
512-
USE(init->Call(context, wrap, 0, nullptr));
529+
if (init->Call(context, wrap, 0, nullptr).IsEmpty())
530+
return;
513531
}
514532

533+
Local<Function> emit_message_fn;
534+
if (!GetEmitMessageFunction(context).ToLocal(&emit_message_fn))
535+
return;
536+
emit_message_fn_.Reset(env->isolate(), emit_message_fn);
537+
538+
async_.data = static_cast<void*>(this);
515539
Debug(this, "Created message port");
516540
}
517541

@@ -559,6 +583,11 @@ MessagePort* MessagePort::New(
559583
return nullptr;
560584
MessagePort* port = new MessagePort(env, context, instance);
561585
CHECK_NOT_NULL(port);
586+
if (port->IsHandleClosing()) {
587+
// Construction failed with an exception.
588+
return nullptr;
589+
}
590+
562591
if (data) {
563592
port->Detach();
564593
port->data_ = std::move(data);
@@ -651,16 +680,8 @@ void MessagePort::OnMessage() {
651680
continue;
652681
}
653682

654-
Local<Object> event;
655-
Local<Value> cb_args[1];
656-
if (!env()->message_event_object_template()->NewInstance(context)
657-
.ToLocal(&event) ||
658-
event->Set(context, env()->data_string(), payload).IsNothing() ||
659-
event->Set(context, env()->target_string(), object()).IsNothing() ||
660-
(cb_args[0] = event, false) ||
661-
MakeCallback(env()->onmessage_string(),
662-
arraysize(cb_args),
663-
cb_args).IsEmpty()) {
683+
Local<Function> emit_message = PersistentToLocal::Strong(emit_message_fn_);
684+
if (MakeCallback(emit_message, 1, &payload).IsEmpty()) {
664685
// Re-schedule OnMessage() execution in case of failure.
665686
if (data_)
666687
TriggerAsync();
@@ -929,6 +950,7 @@ void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
929950

930951
void MessagePort::MemoryInfo(MemoryTracker* tracker) const {
931952
tracker->TrackField("data", data_);
953+
tracker->TrackField("emit_message_fn", emit_message_fn_);
932954
}
933955

934956
Local<FunctionTemplate> GetMessagePortConstructorTemplate(Environment* env) {
@@ -938,8 +960,6 @@ Local<FunctionTemplate> GetMessagePortConstructorTemplate(Environment* env) {
938960
if (!templ.IsEmpty())
939961
return templ;
940962

941-
Isolate* isolate = env->isolate();
942-
943963
{
944964
Local<FunctionTemplate> m = env->NewFunctionTemplate(MessagePort::New);
945965
m->SetClassName(env->message_port_constructor_string());
@@ -950,13 +970,6 @@ Local<FunctionTemplate> GetMessagePortConstructorTemplate(Environment* env) {
950970
env->SetProtoMethod(m, "start", MessagePort::Start);
951971

952972
env->set_message_port_constructor_template(m);
953-
954-
Local<FunctionTemplate> event_ctor = FunctionTemplate::New(isolate);
955-
event_ctor->SetClassName(FIXED_ONE_BYTE_STRING(isolate, "MessageEvent"));
956-
Local<ObjectTemplate> e = event_ctor->InstanceTemplate();
957-
e->Set(env->data_string(), Null(isolate));
958-
e->Set(env->target_string(), Null(isolate));
959-
env->set_message_event_object_template(e);
960973
}
961974

962975
return GetMessagePortConstructorTemplate(env);
@@ -975,7 +988,13 @@ static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
975988
Context::Scope context_scope(context);
976989

977990
MessagePort* port1 = MessagePort::New(env, context);
991+
if (port1 == nullptr) return;
978992
MessagePort* port2 = MessagePort::New(env, context);
993+
if (port2 == nullptr) {
994+
port1->Close();
995+
return;
996+
}
997+
979998
MessagePort::Entangle(port1, port2);
980999

9811000
args.This()->Set(context, env->port1_string(), port1->object())

‎src/node_messaging.h

+6-1
Original file line numberDiff line numberDiff line change
@@ -132,12 +132,16 @@ class MessagePortData : public MemoryRetainer {
132132
// the uv_async_t handle that is used to notify the current event loop of
133133
// new incoming messages.
134134
class MessagePort : public HandleWrap {
135-
public:
135+
private:
136136
// Create a new MessagePort. The `context` argument specifies the Context
137137
// instance that is used for creating the values emitted from this port.
138+
// This is called by MessagePort::New(), which is the public API used for
139+
// creating MessagePort instances.
138140
MessagePort(Environment* env,
139141
v8::Local<v8::Context> context,
140142
v8::Local<v8::Object> wrap);
143+
144+
public:
141145
~MessagePort() override;
142146

143147
// Create a new message port instance, optionally over an existing
@@ -206,6 +210,7 @@ class MessagePort : public HandleWrap {
206210
std::unique_ptr<MessagePortData> data_ = nullptr;
207211
bool receiving_messages_ = false;
208212
uv_async_t async_;
213+
v8::Global<v8::Function> emit_message_fn_;
209214

210215
friend class MessagePortData;
211216
};

0 commit comments

Comments
 (0)
Please sign in to comment.