Skip to content

Commit

Permalink
src: add interrupts to Environments/Workers
Browse files Browse the repository at this point in the history
Allow doing what V8’s `v8::Isolate::RequestInterrupt()` does for V8.
This also works when there is no JS code currently executing.

PR-URL: nodejs#31386
Refs: openjs-foundation/summit#240
Reviewed-By: Gireesh Punathil <gpunathi@in.ibm.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
  • Loading branch information
addaleax authored and MylesBorins committed Apr 1, 2020
1 parent 3f9d1dd commit b0ce668
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 0 deletions.
12 changes: 12 additions & 0 deletions src/env-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,18 @@ void Environment::SetImmediateThreadsafe(Fn&& cb) {
uv_async_send(&task_queues_async_);
}

template <typename Fn>
void Environment::RequestInterrupt(Fn&& cb) {
auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
std::move(cb), false);
{
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
native_immediates_interrupts_.Push(std::move(callback));
}
uv_async_send(&task_queues_async_);
RequestInterruptFromV8();
}

Environment::NativeImmediateCallback::NativeImmediateCallback(bool refed)
: refed_(refed) {}

Expand Down
41 changes: 41 additions & 0 deletions src/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,8 @@ Environment::Environment(IsolateData* isolate_data,
}

Environment::~Environment() {
if (interrupt_data_ != nullptr) *interrupt_data_ = nullptr;

isolate()->GetHeapProfiler()->RemoveBuildEmbedderGraphCallback(
BuildEmbedderGraph, this);

Expand Down Expand Up @@ -653,11 +655,29 @@ void Environment::AtExit(void (*cb)(void* arg), void* arg) {
at_exit_functions_.push_front(ExitCallback{cb, arg});
}

void Environment::RunAndClearInterrupts() {
while (native_immediates_interrupts_.size() > 0) {
NativeImmediateQueue queue;
{
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
queue.ConcatMove(std::move(native_immediates_interrupts_));
}
DebugSealHandleScope seal_handle_scope(isolate());

while (std::unique_ptr<NativeImmediateCallback> head = queue.Shift())
head->Call(this);
}
}

void Environment::RunAndClearNativeImmediates(bool only_refed) {
TraceEventScope trace_scope(TRACING_CATEGORY_NODE1(environment),
"RunAndClearNativeImmediates", this);
size_t ref_count = 0;

// Handle interrupts first. These functions are not allowed to throw
// exceptions, so we do not need to handle that.
RunAndClearInterrupts();

// It is safe to check .size() first, because there is a causal relationship
// between pushes to the threadsafe and this function being called.
// For the common case, it's worth checking the size first before establishing
Expand Down Expand Up @@ -697,6 +717,27 @@ void Environment::RunAndClearNativeImmediates(bool only_refed) {
ToggleImmediateRef(false);
}

void Environment::RequestInterruptFromV8() {
if (interrupt_data_ != nullptr) return; // Already scheduled.

// The Isolate may outlive the Environment, so some logic to handle the
// situation in which the Environment is destroyed before the handler runs
// is required.
interrupt_data_ = new Environment*(this);

isolate()->RequestInterrupt([](Isolate* isolate, void* data) {
std::unique_ptr<Environment*> env_ptr { static_cast<Environment**>(data) };
Environment* env = *env_ptr;
if (env == nullptr) {
// The Environment has already been destroyed. That should be okay; any
// callback added before the Environment shuts down would have been
// handled during cleanup.
return;
}
env->interrupt_data_ = nullptr;
env->RunAndClearInterrupts();
}, interrupt_data_);
}

void Environment::ScheduleTimer(int64_t duration_ms) {
if (started_cleanup_) return;
Expand Down
10 changes: 10 additions & 0 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -1176,6 +1176,12 @@ class Environment : public MemoryRetainer {
template <typename Fn>
// This behaves like SetImmediate() but can be called from any thread.
inline void SetImmediateThreadsafe(Fn&& cb);
// This behaves like V8's Isolate::RequestInterrupt(), but also accounts for
// the event loop (i.e. combines the V8 function with SetImmediate()).
// The passed callback may not throw exceptions.
// This function can be called from any thread.
template <typename Fn>
inline void RequestInterrupt(Fn&& cb);
// This needs to be available for the JS-land setImmediate().
void ToggleImmediateRef(bool ref);

Expand Down Expand Up @@ -1426,8 +1432,12 @@ class Environment : public MemoryRetainer {
NativeImmediateQueue native_immediates_;
Mutex native_immediates_threadsafe_mutex_;
NativeImmediateQueue native_immediates_threadsafe_;
NativeImmediateQueue native_immediates_interrupts_;

void RunAndClearNativeImmediates(bool only_refed = false);
void RunAndClearInterrupts();
Environment** interrupt_data_ = nullptr;
void RequestInterruptFromV8();
static void CheckImmediate(uv_check_t* handle);

// Use an unordered_set, so that we have efficient insertion and removal.
Expand Down
11 changes: 11 additions & 0 deletions src/node_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ class Worker : public AsyncWrap {
tracker->TrackField("parent_port", parent_port_);
}

template <typename Fn>
inline bool RequestInterrupt(Fn&& cb);

SET_MEMORY_INFO_NAME(Worker)
SET_SELF_SIZE(Worker)

Expand Down Expand Up @@ -125,6 +128,14 @@ class Worker : public AsyncWrap {
friend class WorkerThreadData;
};

template <typename Fn>
bool Worker::RequestInterrupt(Fn&& cb) {
Mutex::ScopedLock lock(mutex_);
if (env_ == nullptr) return false;
env_->RequestInterrupt(std::move(cb));
return true;
}

} // namespace worker
} // namespace node

Expand Down

0 comments on commit b0ce668

Please sign in to comment.