Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

report: add support for Workers #31386

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 21 additions & 0 deletions doc/api/report.md
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ is provided below for reference.
"address": "0x000055fc7b2cb180"
}
],
"workers": [],
"environmentVariables": {
"REMOTEHOST": "REMOVED",
"MANPATH": "/opt/rh/devtoolset-3/root/usr/share/man:",
Expand Down Expand Up @@ -577,4 +578,24 @@ NODE_OPTIONS="--experimental-report --report-uncaught-exception \
Specific API documentation can be found under
[`process API documentation`][] section.

## Interaction with Workers
<!-- YAML
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/31386
description: Workers are now included in the report.
-->

[`Worker`][] threads can create reports in the same way that the main thread
does.

Reports will include information on any Workers that are children of the current
thread as part of the `workers` section, with each Worker generating a report
in the standard report format.

The thread which is generating the report will wait for the reports from Worker
threads to finish. However, the latency for this will usually be low, as both
running JavaScript and the event loop are interrupted to generate the report.

[`process API documentation`]: process.html
[`Worker`]: worker_threads.html
111 changes: 84 additions & 27 deletions src/env-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,14 +242,6 @@ inline bool ImmediateInfo::has_outstanding() const {
return fields_[kHasOutstanding] == 1;
}

inline void ImmediateInfo::count_inc(uint32_t increment) {
fields_[kCount] += increment;
}

inline void ImmediateInfo::count_dec(uint32_t decrement) {
fields_[kCount] -= decrement;
}

inline void ImmediateInfo::ref_count_inc(uint32_t increment) {
fields_[kRefCount] += increment;
}
Expand Down Expand Up @@ -748,19 +740,51 @@ inline void IsolateData::set_options(
options_ = std::move(options);
}

template <typename Fn>
void Environment::CreateImmediate(Fn&& cb, bool ref) {
auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
std::move(cb), ref);
NativeImmediateCallback* prev_tail = native_immediate_callbacks_tail_;
std::unique_ptr<Environment::NativeImmediateCallback>
Environment::NativeImmediateQueue::Shift() {
std::unique_ptr<Environment::NativeImmediateCallback> ret = std::move(head_);
if (ret) {
head_ = ret->get_next();
if (!head_)
tail_ = nullptr; // The queue is now empty.
}
size_--;
return ret;
}

native_immediate_callbacks_tail_ = callback.get();
void Environment::NativeImmediateQueue::Push(
std::unique_ptr<Environment::NativeImmediateCallback> cb) {
NativeImmediateCallback* prev_tail = tail_;

size_++;
tail_ = cb.get();
if (prev_tail != nullptr)
prev_tail->set_next(std::move(callback));
prev_tail->set_next(std::move(cb));
else
head_ = std::move(cb);
}

void Environment::NativeImmediateQueue::ConcatMove(
NativeImmediateQueue&& other) {
size_ += other.size_;
if (tail_ != nullptr)
tail_->set_next(std::move(other.head_));
else
native_immediate_callbacks_head_ = std::move(callback);
head_ = std::move(other.head_);
tail_ = other.tail_;
other.tail_ = nullptr;
other.size_ = 0;
}

immediate_info()->count_inc(1);
size_t Environment::NativeImmediateQueue::size() const {
return size_.load();
}

template <typename Fn>
void Environment::CreateImmediate(Fn&& cb, bool ref) {
auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
std::move(cb), ref);
native_immediates_.Push(std::move(callback));
}

template <typename Fn>
Expand All @@ -777,6 +801,29 @@ void Environment::SetUnrefImmediate(Fn&& cb) {
CreateImmediate(std::move(cb), false);
}

template <typename Fn>
void Environment::SetImmediateThreadsafe(Fn&& cb) {
auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
std::move(cb), false);
{
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
native_immediates_threadsafe_.Push(std::move(callback));
}
uv_async_send(&task_queues_async_);
}

template <typename Fn>
void Environment::RequestInterrupt(Fn&& cb) {
auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
std::move(cb), false);
{
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
native_immediates_interrupts_.Push(std::move(callback));
}
uv_async_send(&task_queues_async_);
RequestInterruptFromV8();
}

Environment::NativeImmediateCallback::NativeImmediateCallback(bool refed)
: refed_(refed) {}

Expand Down Expand Up @@ -862,8 +909,26 @@ inline void Environment::remove_sub_worker_context(worker::Worker* context) {
sub_worker_contexts_.erase(context);
}

template <typename Fn>
inline void Environment::ForEachWorker(Fn&& iterator) {
for (worker::Worker* w : sub_worker_contexts_) iterator(w);
}

inline void Environment::add_refs(int64_t diff) {
task_queues_async_refs_ += diff;
CHECK_GE(task_queues_async_refs_, 0);
if (task_queues_async_refs_ == 0)
uv_unref(reinterpret_cast<uv_handle_t*>(&task_queues_async_));
else
uv_ref(reinterpret_cast<uv_handle_t*>(&task_queues_async_));
}

inline bool Environment::is_stopping() const {
return thread_stopper_.is_stopped();
return is_stopping_.load();
}

inline void Environment::set_stopping(bool value) {
is_stopping_.store(value);
}

inline std::list<node_module>* Environment::extra_linked_bindings() {
Expand Down Expand Up @@ -1146,7 +1211,7 @@ void Environment::RemoveCleanupHook(void (*fn)(void*), void* arg) {
inline void Environment::RegisterFinalizationGroupForCleanup(
v8::Local<v8::FinalizationGroup> group) {
cleanup_finalization_groups_.emplace_back(isolate(), group);
uv_async_send(&cleanup_finalization_groups_async_);
uv_async_send(&task_queues_async_);
}

size_t CleanupHookCallback::Hash::operator()(
Expand Down Expand Up @@ -1183,14 +1248,6 @@ int64_t Environment::base_object_count() const {
return base_object_count_;
}

bool AsyncRequest::is_stopped() const {
return stopped_.load();
}

void AsyncRequest::set_stopped(bool flag) {
stopped_.store(flag);
}

#define VP(PropertyName, StringValue) V(v8::Private, PropertyName)
#define VY(PropertyName, StringValue) V(v8::Symbol, PropertyName)
#define VS(PropertyName, StringValue) V(v8::String, PropertyName)
Expand Down