Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

src: distinguish refed/unrefed threadsafe Immediates #33320

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/env-inl.h
Expand Up @@ -727,9 +727,9 @@ void Environment::SetUnrefImmediate(Fn&& cb) {
}

template <typename Fn>
void Environment::SetImmediateThreadsafe(Fn&& cb) {
void Environment::SetImmediateThreadsafe(Fn&& cb, bool refed) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would prefer an enum rather than a magical boolean :-)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jasnell Hm yeah … this reaches into the CallbackQueue implementation which also takes this as a boolean argument and returns the result as a boolean, so maybe in a follow-up PR? I’d like to be consistent here…

auto callback =
native_immediates_threadsafe_.CreateCallback(std::move(cb), false);
native_immediates_threadsafe_.CreateCallback(std::move(cb), refed);
{
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
native_immediates_threadsafe_.Push(std::move(callback));
Expand Down
29 changes: 17 additions & 12 deletions src/env.cc
Expand Up @@ -743,19 +743,10 @@ void Environment::RunAndClearNativeImmediates(bool only_refed) {
// exceptions, so we do not need to handle that.
RunAndClearInterrupts();

// It is safe to check .size() first, because there is a causal relationship
// between pushes to the threadsafe and this function being called.
// For the common case, it's worth checking the size first before establishing
// a mutex lock.
if (native_immediates_threadsafe_.size() > 0) {
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
native_immediates_.ConcatMove(std::move(native_immediates_threadsafe_));
}

auto drain_list = [&]() {
auto drain_list = [&](NativeImmediateQueue* queue) {
TryCatchScope try_catch(this);
DebugSealHandleScope seal_handle_scope(isolate());
while (auto head = native_immediates_.Shift()) {
while (auto head = queue->Shift()) {
if (head->is_refed())
ref_count++;

Expand All @@ -773,12 +764,26 @@ void Environment::RunAndClearNativeImmediates(bool only_refed) {
}
return false;
};
while (drain_list()) {}
while (drain_list(&native_immediates_)) {}

immediate_info()->ref_count_dec(ref_count);

if (immediate_info()->ref_count() == 0)
ToggleImmediateRef(false);

// It is safe to check .size() first, because there is a causal relationship
// between pushes to the threadsafe immediate list and this function being
// called. For the common case, it's worth checking the size first before
// establishing a mutex lock.
// This is intentionally placed after the `ref_count` handling, because when
// refed threadsafe immediates are created, they are not counted towards the
// count in immediate_info() either.
NativeImmediateQueue threadsafe_immediates;
if (native_immediates_threadsafe_.size() > 0) {
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
threadsafe_immediates.ConcatMove(std::move(native_immediates_threadsafe_));
}
while (drain_list(&threadsafe_immediates)) {}
}

void Environment::RequestInterruptFromV8() {
Expand Down
2 changes: 1 addition & 1 deletion src/env.h
Expand Up @@ -1168,7 +1168,7 @@ class Environment : public MemoryRetainer {
inline void SetUnrefImmediate(Fn&& cb);
template <typename Fn>
// This behaves like SetImmediate() but can be called from any thread.
inline void SetImmediateThreadsafe(Fn&& cb);
inline void SetImmediateThreadsafe(Fn&& cb, bool refed = true);
// This behaves like V8's Isolate::RequestInterrupt(), but also accounts for
// the event loop (i.e. combines the V8 function with SetImmediate()).
// The passed callback may not throw exceptions.
Expand Down
2 changes: 1 addition & 1 deletion src/node_worker.cc
Expand Up @@ -754,7 +754,7 @@ void Worker::TakeHeapSnapshot(const FunctionCallbackInfo<Value>& args) {
env, std::move(snapshot));
Local<Value> args[] = { stream->object() };
taker->MakeCallback(env->ondone_string(), arraysize(args), args);
});
}, /* refed */ false);
});
args.GetReturnValue().Set(scheduled ? taker->object() : Local<Object>());
}
Expand Down