Skip to content

Commit

Permalink
worker: add support for worker name in inspector and trace_events
Browse files Browse the repository at this point in the history
Fixes: #41589
PR-URL: #46832
Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Darshan Sen <raisinten@gmail.com>
Reviewed-By: Gireesh Punathil <gpunathi@in.ibm.com>
  • Loading branch information
debadree25 authored and danielleadams committed Apr 11, 2023
1 parent 09c5e6a commit f7423bd
Show file tree
Hide file tree
Showing 14 changed files with 144 additions and 30 deletions.
7 changes: 7 additions & 0 deletions doc/api/worker_threads.md
Expand Up @@ -900,6 +900,10 @@ if (isMainThread) {
<!-- YAML
added: v10.5.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/46832
description: Added support for a `name` option, which allows
adding a name to worker title for debugging.
- version: v14.9.0
pr-url: https://github.com/nodejs/node/pull/34584
description: The `filename` parameter can be a WHATWG `URL` object using
Expand Down Expand Up @@ -998,6 +1002,9 @@ changes:
used for generated code.
* `stackSizeMb` {number} The default maximum stack size for the thread.
Small values may lead to unusable Worker instances. **Default:** `4`.
* `name` {string} An optional `name` to be appended to the worker title
for debuggin/identification purposes, making the final title as
`[worker ${id}] ${name}`. **Default:** `''`.

### Event: `'error'`

Expand Down
12 changes: 10 additions & 2 deletions lib/internal/worker.js
Expand Up @@ -17,6 +17,7 @@ const {
SafeArrayIterator,
SafeMap,
String,
StringPrototypeTrim,
Symbol,
SymbolFor,
TypedArrayPrototypeFill,
Expand Down Expand Up @@ -56,7 +57,7 @@ const {
const { deserializeError } = require('internal/error_serdes');
const { fileURLToPath, isURLInstance, pathToFileURL } = require('internal/url');
const { kEmptyObject } = require('internal/util');
const { validateArray } = require('internal/validators');
const { validateArray, validateString } = require('internal/validators');

const {
ownsProcessState,
Expand Down Expand Up @@ -184,12 +185,19 @@ class Worker extends EventEmitter {
options.env);
}

let name = '';
if (options.name) {
validateString(options.name, 'options.name');
name = StringPrototypeTrim(options.name);
}

// Set up the C++ handle for the worker, as well as some internal wiring.
this[kHandle] = new WorkerImpl(url,
env === process.env ? null : env,
options.execArgv,
parseResourceLimits(options.resourceLimits),
!!(options.trackUnmanagedFds ?? true));
!!(options.trackUnmanagedFds ?? true),
name);
if (this[kHandle].invalidExecArgv) {
throw new ERR_WORKER_INVALID_EXEC_ARGV(this[kHandle].invalidExecArgv);
}
Expand Down
8 changes: 7 additions & 1 deletion src/api/environment.cc
Expand Up @@ -453,11 +453,17 @@ NODE_EXTERN std::unique_ptr<InspectorParentHandle> GetInspectorParentHandle(
Environment* env,
ThreadId thread_id,
const char* url) {
return GetInspectorParentHandle(env, thread_id, url, "");
}

NODE_EXTERN std::unique_ptr<InspectorParentHandle> GetInspectorParentHandle(
Environment* env, ThreadId thread_id, const char* url, const char* name) {
CHECK_NOT_NULL(env);
if (name == nullptr) name = "";
CHECK_NE(thread_id.id, static_cast<uint64_t>(-1));
#if HAVE_INSPECTOR
return std::make_unique<InspectorParentHandleImpl>(
env->inspector_agent()->GetParentHandle(thread_id.id, url));
env->inspector_agent()->GetParentHandle(thread_id.id, url, name));
#else
return {};
#endif
Expand Down
23 changes: 14 additions & 9 deletions src/inspector/worker_inspector.cc
Expand Up @@ -14,18 +14,20 @@ class WorkerStartedRequest : public Request {
uint64_t id,
const std::string& url,
std::shared_ptr<node::inspector::MainThreadHandle> worker_thread,
bool waiting)
bool waiting,
const std::string& name)
: id_(id),
info_(BuildWorkerTitle(id), url, worker_thread),
info_(BuildWorkerTitle(id, name), url, worker_thread),
waiting_(waiting) {}
void Call(MainThreadInterface* thread) override {
auto manager = thread->inspector_agent()->GetWorkerManager();
manager->WorkerStarted(id_, info_, waiting_);
}

private:
static std::string BuildWorkerTitle(int id) {
return "Worker " + std::to_string(id);
static std::string BuildWorkerTitle(int id, const std::string& name) {
return "[worker " + std::to_string(id) + "]" +
(name == "" ? "" : " " + name);
}

uint64_t id_;
Expand Down Expand Up @@ -57,11 +59,13 @@ ParentInspectorHandle::ParentInspectorHandle(
uint64_t id,
const std::string& url,
std::shared_ptr<MainThreadHandle> parent_thread,
bool wait_for_connect)
bool wait_for_connect,
const std::string& name)
: id_(id),
url_(url),
parent_thread_(parent_thread),
wait_(wait_for_connect) {}
wait_(wait_for_connect),
name_(name) {}

ParentInspectorHandle::~ParentInspectorHandle() {
parent_thread_->Post(
Expand All @@ -71,7 +75,7 @@ ParentInspectorHandle::~ParentInspectorHandle() {
void ParentInspectorHandle::WorkerStarted(
std::shared_ptr<MainThreadHandle> worker_thread, bool waiting) {
std::unique_ptr<Request> request(
new WorkerStartedRequest(id_, url_, worker_thread, waiting));
new WorkerStartedRequest(id_, url_, worker_thread, waiting, name_));
parent_thread_->Post(std::move(request));
}

Expand All @@ -97,9 +101,10 @@ void WorkerManager::WorkerStarted(uint64_t session_id,
}

std::unique_ptr<ParentInspectorHandle> WorkerManager::NewParentHandle(
uint64_t thread_id, const std::string& url) {
uint64_t thread_id, const std::string& url, const std::string& name) {
bool wait = !delegates_waiting_on_start_.empty();
return std::make_unique<ParentInspectorHandle>(thread_id, url, thread_, wait);
return std::make_unique<ParentInspectorHandle>(
thread_id, url, thread_, wait, name);
}

void WorkerManager::RemoveAttachDelegate(int id) {
Expand Down
14 changes: 7 additions & 7 deletions src/inspector/worker_inspector.h
Expand Up @@ -56,14 +56,13 @@ class ParentInspectorHandle {
ParentInspectorHandle(uint64_t id,
const std::string& url,
std::shared_ptr<MainThreadHandle> parent_thread,
bool wait_for_connect);
bool wait_for_connect,
const std::string& name);
~ParentInspectorHandle();
std::unique_ptr<ParentInspectorHandle> NewParentInspectorHandle(
uint64_t thread_id, const std::string& url) {
return std::make_unique<ParentInspectorHandle>(thread_id,
url,
parent_thread_,
wait_);
uint64_t thread_id, const std::string& url, const std::string& name) {
return std::make_unique<ParentInspectorHandle>(
thread_id, url, parent_thread_, wait_, name);
}
void WorkerStarted(std::shared_ptr<MainThreadHandle> worker_thread,
bool waiting);
Expand All @@ -80,6 +79,7 @@ class ParentInspectorHandle {
std::string url_;
std::shared_ptr<MainThreadHandle> parent_thread_;
bool wait_;
std::string name_;
};

class WorkerManager : public std::enable_shared_from_this<WorkerManager> {
Expand All @@ -88,7 +88,7 @@ class WorkerManager : public std::enable_shared_from_this<WorkerManager> {
: thread_(thread) {}

std::unique_ptr<ParentInspectorHandle> NewParentHandle(
uint64_t thread_id, const std::string& url);
uint64_t thread_id, const std::string& url, const std::string& name);
void WorkerStarted(uint64_t session_id, const WorkerInfo& info, bool waiting);
void WorkerFinished(uint64_t session_id);
std::unique_ptr<WorkerManagerEventHandle> SetAutoAttach(
Expand Down
6 changes: 3 additions & 3 deletions src/inspector_agent.cc
Expand Up @@ -948,17 +948,17 @@ void Agent::SetParentHandle(
}

std::unique_ptr<ParentInspectorHandle> Agent::GetParentHandle(
uint64_t thread_id, const std::string& url) {
uint64_t thread_id, const std::string& url, const std::string& name) {
if (!parent_env_->should_create_inspector() && !client_) {
ThrowUninitializedInspectorError(parent_env_);
return std::unique_ptr<ParentInspectorHandle>{};
}

CHECK_NOT_NULL(client_);
if (!parent_handle_) {
return client_->getWorkerManager()->NewParentHandle(thread_id, url);
return client_->getWorkerManager()->NewParentHandle(thread_id, url, name);
} else {
return parent_handle_->NewParentInspectorHandle(thread_id, url);
return parent_handle_->NewParentInspectorHandle(thread_id, url, name);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/inspector_agent.h
Expand Up @@ -82,7 +82,7 @@ class Agent {

void SetParentHandle(std::unique_ptr<ParentInspectorHandle> parent_handle);
std::unique_ptr<ParentInspectorHandle> GetParentHandle(
uint64_t thread_id, const std::string& url);
uint64_t thread_id, const std::string& url, const std::string& name);

// Called to create inspector sessions that can be used from the same thread.
// The inspector responds by using the delegate to send messages back.
Expand Down
6 changes: 6 additions & 0 deletions src/node.h
Expand Up @@ -590,6 +590,12 @@ NODE_EXTERN std::unique_ptr<InspectorParentHandle> GetInspectorParentHandle(
ThreadId child_thread_id,
const char* child_url);

NODE_EXTERN std::unique_ptr<InspectorParentHandle> GetInspectorParentHandle(
Environment* parent_env,
ThreadId child_thread_id,
const char* child_url,
const char* name);

struct StartExecutionCallbackInfo {
v8::Local<v8::Object> process_object;
v8::Local<v8::Function> native_require;
Expand Down
21 changes: 15 additions & 6 deletions src/node_worker.cc
Expand Up @@ -48,6 +48,7 @@ constexpr double kMB = 1024 * 1024;
Worker::Worker(Environment* env,
Local<Object> wrap,
const std::string& url,
const std::string& name,
std::shared_ptr<PerIsolateOptions> per_isolate_opts,
std::vector<std::string>&& exec_argv,
std::shared_ptr<KVStore> env_vars,
Expand All @@ -57,6 +58,7 @@ Worker::Worker(Environment* env,
exec_argv_(exec_argv),
platform_(env->isolate_data()->platform()),
thread_id_(AllocateEnvironmentThreadId()),
name_(name),
env_vars_(env_vars),
snapshot_data_(snapshot_data) {
Debug(this, "Creating new worker instance with thread id %llu",
Expand All @@ -81,8 +83,8 @@ Worker::Worker(Environment* env,
Number::New(env->isolate(), static_cast<double>(thread_id_.id)))
.Check();

inspector_parent_handle_ = GetInspectorParentHandle(
env, thread_id_, url.c_str());
inspector_parent_handle_ =
GetInspectorParentHandle(env, thread_id_, url.c_str(), name.c_str());

argv_ = std::vector<std::string>{env->argv()[0]};
// Mark this Worker object as weak until we actually start the thread.
Expand Down Expand Up @@ -256,11 +258,10 @@ size_t Worker::NearHeapLimit(void* data, size_t current_heap_limit,
}

void Worker::Run() {
std::string name = "WorkerThread ";
name += std::to_string(thread_id_.id);
std::string trace_name = "[worker " + std::to_string(thread_id_.id) + "]" +
(name_ == "" ? "" : " " + name_);
TRACE_EVENT_METADATA1(
"__metadata", "thread_name", "name",
TRACE_STR_COPY(name.c_str()));
"__metadata", "thread_name", "name", TRACE_STR_COPY(trace_name.c_str()));
CHECK_NOT_NULL(platform_);

Debug(this, "Creating isolate for worker with id %llu", thread_id_.id);
Expand Down Expand Up @@ -454,6 +455,7 @@ void Worker::New(const FunctionCallbackInfo<Value>& args) {
}

std::string url;
std::string name;
std::shared_ptr<PerIsolateOptions> per_isolate_opts = nullptr;
std::shared_ptr<KVStore> env_vars = nullptr;

Expand All @@ -466,6 +468,12 @@ void Worker::New(const FunctionCallbackInfo<Value>& args) {
url.append(value.out(), value.length());
}

if (!args[5]->IsNullOrUndefined()) {
Utf8Value value(
isolate, args[5]->ToString(env->context()).FromMaybe(Local<String>()));
name.append(value.out(), value.length());
}

if (args[1]->IsNull()) {
// Means worker.env = { ...process.env }.
env_vars = env->env_vars()->Clone(isolate);
Expand Down Expand Up @@ -579,6 +587,7 @@ void Worker::New(const FunctionCallbackInfo<Value>& args) {
Worker* worker = new Worker(env,
args.This(),
url,
name,
per_isolate_opts,
std::move(exec_argv_out),
env_vars,
Expand Down
3 changes: 3 additions & 0 deletions src/node_worker.h
Expand Up @@ -29,6 +29,7 @@ class Worker : public AsyncWrap {
Worker(Environment* env,
v8::Local<v8::Object> wrap,
const std::string& url,
const std::string& name,
std::shared_ptr<PerIsolateOptions> per_isolate_opts,
std::vector<std::string>&& exec_argv,
std::shared_ptr<KVStore> env_vars,
Expand Down Expand Up @@ -98,6 +99,8 @@ class Worker : public AsyncWrap {
int exit_code_ = 0;
ThreadId thread_id_;
uintptr_t stack_base_ = 0;
// Optional name used for debugging in inspector and trace events.
std::string name_;

// Custom resource constraints:
double resource_limits_[kTotalResourceLimitCount];
Expand Down
17 changes: 17 additions & 0 deletions test/fixtures/worker-name.js
@@ -0,0 +1,17 @@
const { Session } = require('inspector');
const { parentPort } = require('worker_threads');

const session = new Session();

parentPort.once('message', () => {}); // Prevent the worker from exiting.

session.connectToMainThread();

session.on(
'NodeWorker.attachedToWorker',
({ params: { workerInfo } }) => {
// send the worker title to the main thread
parentPort.postMessage(workerInfo.title);
}
);
session.post('NodeWorker.enable', { waitForDebuggerOnStart: false });
31 changes: 31 additions & 0 deletions test/parallel/test-trace-events-worker-metadata-with-name.js
@@ -0,0 +1,31 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const cp = require('child_process');
const fs = require('fs');
const { isMainThread } = require('worker_threads');

if (isMainThread) {
const CODE = 'const { Worker } = require(\'worker_threads\'); ' +
`new Worker('${__filename.replace(/\\/g, '/')}', { name: 'foo' })`;
const FILE_NAME = 'node_trace.1.log';
const tmpdir = require('../common/tmpdir');
tmpdir.refresh();
process.chdir(tmpdir.path);

const proc = cp.spawn(process.execPath,
[ '--trace-event-categories', 'node',
'-e', CODE ]);
proc.once('exit', common.mustCall(() => {
assert(fs.existsSync(FILE_NAME));
fs.readFile(FILE_NAME, common.mustCall((err, data) => {
const traces = JSON.parse(data.toString()).traceEvents;
assert(traces.length > 0);
assert(traces.some((trace) =>
trace.cat === '__metadata' && trace.name === 'thread_name' &&
trace.args.name === '[worker 1] foo'));
}));
}));
} else {
// Do nothing here.
}
2 changes: 1 addition & 1 deletion test/parallel/test-trace-events-worker-metadata.js
Expand Up @@ -23,7 +23,7 @@ if (isMainThread) {
assert(traces.length > 0);
assert(traces.some((trace) =>
trace.cat === '__metadata' && trace.name === 'thread_name' &&
trace.args.name === 'WorkerThread 1'));
trace.args.name === '[worker 1]'));
}));
}));
} else {
Expand Down
22 changes: 22 additions & 0 deletions test/parallel/test-worker-name.js
@@ -0,0 +1,22 @@
'use strict';

const common = require('../common');
const fixtures = require('../common/fixtures');

common.skipIfInspectorDisabled();
common.skipIfWorker(); // This test requires both main and worker threads.

const assert = require('assert');
const { Worker, isMainThread } = require('worker_threads');

if (isMainThread) {
const name = 'Hello Thread';
const expectedTitle = `[worker 1] ${name}`;
const worker = new Worker(fixtures.path('worker-name.js'), {
name,
});
worker.once('message', common.mustCall((message) => {
assert.strictEqual(message, expectedTitle);
worker.postMessage('done');
}));
}

0 comments on commit f7423bd

Please sign in to comment.