Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

src,worker: fix race of WorkerHeapSnapshotTaker #44745

Merged
merged 1 commit into from Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/base_object.h
Expand Up @@ -241,7 +241,9 @@ inline T* Unwrap(v8::Local<v8::Value> obj) {
// circumstances such as the GC or Environment cleanup.
// If weak, destruction behaviour is not affected, but the pointer will be
// reset to nullptr once the BaseObject is destroyed.
// The API matches std::shared_ptr closely.
// The API matches std::shared_ptr closely. However, this class is not thread
// safe, that is, we can't have different BaseObjectPtrImpl instances in
// different threads refering to the same BaseObject instance.
template <typename T, bool kIsWeak>
class BaseObjectPtrImpl final {
public:
Expand Down
47 changes: 34 additions & 13 deletions src/node_worker.cc
Expand Up @@ -771,28 +771,49 @@ void Worker::TakeHeapSnapshot(const FunctionCallbackInfo<Value>& args) {
->NewInstance(env->context()).ToLocal(&wrap)) {
return;
}
BaseObjectPtr<WorkerHeapSnapshotTaker> taker =
MakeDetachedBaseObject<WorkerHeapSnapshotTaker>(env, wrap);

// The created WorkerHeapSnapshotTaker is an object owned by main
// thread's Isolate, it can not be accessed by worker thread
std::unique_ptr<BaseObjectPtr<WorkerHeapSnapshotTaker>> taker =
std::make_unique<BaseObjectPtr<WorkerHeapSnapshotTaker>>(
MakeDetachedBaseObject<WorkerHeapSnapshotTaker>(env, wrap));

// Interrupt the worker thread and take a snapshot, then schedule a call
// on the parent thread that turns that snapshot into a readable stream.
bool scheduled = w->RequestInterrupt([taker, env](Environment* worker_env) {
heap::HeapSnapshotPointer snapshot {
worker_env->isolate()->GetHeapProfiler()->TakeHeapSnapshot() };
bool scheduled = w->RequestInterrupt([taker = std::move(taker),
env](Environment* worker_env) mutable {
heap::HeapSnapshotPointer snapshot{
worker_env->isolate()->GetHeapProfiler()->TakeHeapSnapshot()};
CHECK(snapshot);

// Here, the worker thread temporarily owns the WorkerHeapSnapshotTaker
// object.

env->SetImmediateThreadsafe(
[taker, snapshot = std::move(snapshot)](Environment* env) mutable {
[taker = std::move(taker),
snapshot = std::move(snapshot)](Environment* env) mutable {
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());

AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker.get());
BaseObjectPtr<AsyncWrap> stream = heap::CreateHeapSnapshotStream(
env, std::move(snapshot));
Local<Value> args[] = { stream->object() };
taker->MakeCallback(env->ondone_string(), arraysize(args), args);
}, CallbackFlags::kUnrefed);
AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker->get());
BaseObjectPtr<AsyncWrap> stream =
heap::CreateHeapSnapshotStream(env, std::move(snapshot));
Local<Value> args[] = {stream->object()};
taker->get()->MakeCallback(
env->ondone_string(), arraysize(args), args);
// implicitly delete `taker`
},
CallbackFlags::kUnrefed);

// Now, the lambda is delivered to the main thread, as a result, the
// WorkerHeapSnapshotTaker object is delivered to the main thread, too.
});
args.GetReturnValue().Set(scheduled ? taker->object() : Local<Object>());

if (scheduled) {
args.GetReturnValue().Set(wrap);
} else {
args.GetReturnValue().Set(Local<Object>());
}
}

void Worker::LoopIdleTime(const FunctionCallbackInfo<Value>& args) {
Expand Down