Skip to content

Commit

Permalink
worker: add ability to take heap snapshot from parent thread
Browse files Browse the repository at this point in the history
PR-URL: #31569
Reviewed-By: Denys Otrishko <shishugi@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Gus Caplan <me@gus.host>
Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
Reviewed-By: Richard Lau <riclau@uk.ibm.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
  • Loading branch information
addaleax authored and targos committed Apr 28, 2020
1 parent 4aaf407 commit ab8f38b
Show file tree
Hide file tree
Showing 17 changed files with 235 additions and 77 deletions.
5 changes: 5 additions & 0 deletions doc/api/errors.md
Expand Up @@ -2069,6 +2069,11 @@ The `Worker` initialization failed.
The `execArgv` option passed to the `Worker` constructor contains
invalid flags.

<a id="ERR_WORKER_NOT_RUNNING"></a>
### `ERR_WORKER_NOT_RUNNING`

An operation failed because the `Worker` instance is not currently running.

<a id="ERR_WORKER_OUT_OF_MEMORY"></a>
### `ERR_WORKER_OUT_OF_MEMORY`

Expand Down
17 changes: 17 additions & 0 deletions doc/api/worker_threads.md
Expand Up @@ -684,6 +684,21 @@ inside the worker thread. If `stdout: true` was not passed to the
[`Worker`][] constructor, then data will be piped to the parent thread's
[`process.stdout`][] stream.

### `worker.takeHeapSnapshot()`
<!-- YAML
added: REPLACEME
-->

* Returns: {Promise} A promise for a Readable Stream containing
a V8 heap snapshot

Returns a readable stream for a V8 snapshot of the current state of the Worker.
See [`v8.getHeapSnapshot()`][] for more details.

If the Worker thread is no longer running, which may occur before the
[`'exit'` event][] is emitted, the returned `Promise` will be rejected
immediately with an [`ERR_WORKER_NOT_RUNNING`][] error.

### `worker.terminate()`
<!-- YAML
added: v10.5.0
Expand Down Expand Up @@ -726,6 +741,7 @@ active handle in the event system. If the worker is already `unref()`ed calling
[`'exit'` event]: #worker_threads_event_exit
[`AsyncResource`]: async_hooks.html#async_hooks_class_asyncresource
[`Buffer`]: buffer.html
[`ERR_WORKER_NOT_RUNNING`]: errors.html#ERR_WORKER_NOT_RUNNING
[`EventEmitter`]: events.html
[`EventTarget`]: https://developer.mozilla.org/en-US/docs/Web/API/EventTarget
[`MessagePort`]: #worker_threads_class_messageport
Expand Down Expand Up @@ -753,6 +769,7 @@ active handle in the event system. If the worker is already `unref()`ed calling
[`require('worker_threads').threadId`]: #worker_threads_worker_threadid
[`require('worker_threads').workerData`]: #worker_threads_worker_workerdata
[`trace_events`]: tracing.html
[`v8.getHeapSnapshot()`]: v8.html#v8_v8_getheapsnapshot
[`vm`]: vm.html
[`Worker constructor options`]: #worker_threads_new_worker_filename_options
[`worker.on('message')`]: #worker_threads_event_message_1
Expand Down
1 change: 1 addition & 0 deletions lib/internal/errors.js
Expand Up @@ -1379,6 +1379,7 @@ E('ERR_WORKER_INIT_FAILED', 'Worker initialization failure: %s', Error);
E('ERR_WORKER_INVALID_EXEC_ARGV', (errors, msg = 'invalid execArgv flags') =>
`Initiated Worker with ${msg}: ${errors.join(', ')}`,
Error);
E('ERR_WORKER_NOT_RUNNING', 'Worker instance not running', Error);
E('ERR_WORKER_OUT_OF_MEMORY',
'Worker terminated due to reaching memory limit: %s', Error);
E('ERR_WORKER_PATH',
Expand Down
41 changes: 41 additions & 0 deletions lib/internal/heap_utils.js
@@ -0,0 +1,41 @@
'use strict';
const {
Symbol
} = primordials;
const {
kUpdateTimer,
onStreamRead,
} = require('internal/stream_base_commons');
const { owner_symbol } = require('internal/async_hooks').symbols;
const { Readable } = require('stream');

const kHandle = Symbol('kHandle');

class HeapSnapshotStream extends Readable {
constructor(handle) {
super({ autoDestroy: true });
this[kHandle] = handle;
handle[owner_symbol] = this;
handle.onread = onStreamRead;
}

_read() {
if (this[kHandle])
this[kHandle].readStart();
}

_destroy() {
// Release the references on the handle so that
// it can be garbage collected.
this[kHandle][owner_symbol] = undefined;
this[kHandle] = undefined;
}

[kUpdateTimer]() {
// Does nothing
}
}

module.exports = {
HeapSnapshotStream
};
12 changes: 12 additions & 0 deletions lib/internal/worker.js
Expand Up @@ -19,6 +19,7 @@ const path = require('path');

const errorCodes = require('internal/errors').codes;
const {
ERR_WORKER_NOT_RUNNING,
ERR_WORKER_PATH,
ERR_WORKER_UNSERIALIZABLE_ERROR,
ERR_WORKER_UNSUPPORTED_EXTENSION,
Expand Down Expand Up @@ -318,6 +319,17 @@ class Worker extends EventEmitter {

return makeResourceLimits(this[kHandle].getResourceLimits());
}

getHeapSnapshot() {
const heapSnapshotTaker = this[kHandle] && this[kHandle].takeHeapSnapshot();
return new Promise((resolve, reject) => {
if (!heapSnapshotTaker) return reject(new ERR_WORKER_NOT_RUNNING());
heapSnapshotTaker.ondone = (handle) => {
const { HeapSnapshotStream } = require('internal/heap_utils');
resolve(new HeapSnapshotStream(handle));
};
});
}
}

function pipeWithoutWarning(source, dest) {
Expand Down
37 changes: 2 additions & 35 deletions lib/v8.js
Expand Up @@ -25,7 +25,6 @@ const {
Int8Array,
Map,
ObjectPrototypeToString,
Symbol,
Uint16Array,
Uint32Array,
Uint8Array,
Expand All @@ -48,14 +47,7 @@ const {
createHeapSnapshotStream,
triggerHeapSnapshot
} = internalBinding('heap_utils');
const { Readable } = require('stream');
const { owner_symbol } = require('internal/async_hooks').symbols;
const {
kUpdateTimer,
onStreamRead,
} = require('internal/stream_base_commons');
const kHandle = Symbol('kHandle');

const { HeapSnapshotStream } = require('internal/heap_utils');

function writeHeapSnapshot(filename) {
if (filename !== undefined) {
Expand All @@ -65,31 +57,6 @@ function writeHeapSnapshot(filename) {
return triggerHeapSnapshot(filename);
}

class HeapSnapshotStream extends Readable {
constructor(handle) {
super({ autoDestroy: true });
this[kHandle] = handle;
handle[owner_symbol] = this;
handle.onread = onStreamRead;
}

_read() {
if (this[kHandle])
this[kHandle].readStart();
}

_destroy() {
// Release the references on the handle so that
// it can be garbage collected.
this[kHandle][owner_symbol] = undefined;
this[kHandle] = undefined;
}

[kUpdateTimer]() {
// Does nothing
}
}

function getHeapSnapshot() {
const handle = createHeapSnapshotStream();
assert(handle);
Expand Down Expand Up @@ -321,5 +288,5 @@ module.exports = {
DefaultDeserializer,
deserialize,
serialize,
writeHeapSnapshot
writeHeapSnapshot,
};
1 change: 1 addition & 0 deletions node.gyp
Expand Up @@ -138,6 +138,7 @@
'lib/internal/fs/utils.js',
'lib/internal/fs/watchers.js',
'lib/internal/http.js',
'lib/internal/heap_utils.js',
'lib/internal/histogram.js',
'lib/internal/idna.js',
'lib/internal/inspector_async_hook.js',
Expand Down
1 change: 1 addition & 0 deletions src/async_wrap.h
Expand Up @@ -70,6 +70,7 @@ namespace node {
V(UDPWRAP) \
V(SIGINTWATCHDOG) \
V(WORKER) \
V(WORKERHEAPSNAPSHOT) \
V(WRITEWRAP) \
V(ZLIB)

Expand Down
3 changes: 2 additions & 1 deletion src/env.h
Expand Up @@ -425,7 +425,8 @@ constexpr size_t kFsStatsBufferLength =
V(streambaseoutputstream_constructor_template, v8::ObjectTemplate) \
V(tcp_constructor_template, v8::FunctionTemplate) \
V(tty_constructor_template, v8::FunctionTemplate) \
V(write_wrap_template, v8::ObjectTemplate)
V(write_wrap_template, v8::ObjectTemplate) \
V(worker_heap_snapshot_taker_template, v8::ObjectTemplate)

#define ENVIRONMENT_STRONG_PERSISTENT_VALUES(V) \
V(as_callback_data, v8::Object) \
Expand Down
74 changes: 39 additions & 35 deletions src/heap_utils.cc
Expand Up @@ -236,26 +236,24 @@ class HeapSnapshotStream : public AsyncWrap,
public:
HeapSnapshotStream(
Environment* env,
const HeapSnapshot* snapshot,
HeapSnapshotPointer&& snapshot,
v8::Local<v8::Object> obj) :
AsyncWrap(env, obj, AsyncWrap::PROVIDER_HEAPSNAPSHOT),
StreamBase(env),
snapshot_(snapshot) {
snapshot_(std::move(snapshot)) {
MakeWeak();
StreamBase::AttachToObject(GetObject());
}

~HeapSnapshotStream() override {
Cleanup();
}
~HeapSnapshotStream() override {}

int GetChunkSize() override {
return 65536; // big chunks == faster
}

void EndOfStream() override {
EmitRead(UV_EOF);
Cleanup();
snapshot_.reset();
}

WriteResult WriteAsciiChunk(char* data, int size) override {
Expand Down Expand Up @@ -309,22 +307,13 @@ class HeapSnapshotStream : public AsyncWrap,
SET_SELF_SIZE(HeapSnapshotStream)

private:
void Cleanup() {
if (snapshot_ != nullptr) {
const_cast<HeapSnapshot*>(snapshot_)->Delete();
snapshot_ = nullptr;
}
}


const HeapSnapshot* snapshot_;
HeapSnapshotPointer snapshot_;
};

inline void TakeSnapshot(Isolate* isolate, v8::OutputStream* out) {
const HeapSnapshot* const snapshot =
isolate->GetHeapProfiler()->TakeHeapSnapshot();
HeapSnapshotPointer snapshot {
isolate->GetHeapProfiler()->TakeHeapSnapshot() };
snapshot->Serialize(out, HeapSnapshot::kJSON);
const_cast<HeapSnapshot*>(snapshot)->Delete();
}

inline bool WriteSnapshot(Isolate* isolate, const char* filename) {
Expand All @@ -339,20 +328,44 @@ inline bool WriteSnapshot(Isolate* isolate, const char* filename) {

} // namespace

void CreateHeapSnapshotStream(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
void DeleteHeapSnapshot(const v8::HeapSnapshot* snapshot) {
const_cast<HeapSnapshot*>(snapshot)->Delete();
}

BaseObjectPtr<AsyncWrap> CreateHeapSnapshotStream(
Environment* env, HeapSnapshotPointer&& snapshot) {
HandleScope scope(env->isolate());
const HeapSnapshot* const snapshot =
env->isolate()->GetHeapProfiler()->TakeHeapSnapshot();
CHECK_NOT_NULL(snapshot);

if (env->streambaseoutputstream_constructor_template().IsEmpty()) {
// Create FunctionTemplate for HeapSnapshotStream
Local<FunctionTemplate> os = FunctionTemplate::New(env->isolate());
os->Inherit(AsyncWrap::GetConstructorTemplate(env));
Local<ObjectTemplate> ost = os->InstanceTemplate();
ost->SetInternalFieldCount(StreamBase::kStreamBaseFieldCount);
os->SetClassName(
FIXED_ONE_BYTE_STRING(env->isolate(), "HeapSnapshotStream"));
StreamBase::AddMethods(env, os);
env->set_streambaseoutputstream_constructor_template(ost);
}

Local<Object> obj;
if (!env->streambaseoutputstream_constructor_template()
->NewInstance(env->context())
.ToLocal(&obj)) {
return;
return {};
}
HeapSnapshotStream* out = new HeapSnapshotStream(env, snapshot, obj);
args.GetReturnValue().Set(out->object());
return MakeBaseObject<HeapSnapshotStream>(env, std::move(snapshot), obj);
}

void CreateHeapSnapshotStream(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
HeapSnapshotPointer snapshot {
env->isolate()->GetHeapProfiler()->TakeHeapSnapshot() };
CHECK(snapshot);
BaseObjectPtr<AsyncWrap> stream =
CreateHeapSnapshotStream(env, std::move(snapshot));
if (stream)
args.GetReturnValue().Set(stream->object());
}

void TriggerHeapSnapshot(const FunctionCallbackInfo<Value>& args) {
Expand Down Expand Up @@ -388,15 +401,6 @@ void Initialize(Local<Object> target,
env->SetMethod(target, "buildEmbedderGraph", BuildEmbedderGraph);
env->SetMethod(target, "triggerHeapSnapshot", TriggerHeapSnapshot);
env->SetMethod(target, "createHeapSnapshotStream", CreateHeapSnapshotStream);

// Create FunctionTemplate for HeapSnapshotStream
Local<FunctionTemplate> os = FunctionTemplate::New(env->isolate());
os->Inherit(AsyncWrap::GetConstructorTemplate(env));
Local<ObjectTemplate> ost = os->InstanceTemplate();
ost->SetInternalFieldCount(StreamBase::kStreamBaseFieldCount);
os->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "HeapSnapshotStream"));
StreamBase::AddMethods(env, os);
env->set_streambaseoutputstream_constructor_template(ost);
}

} // namespace heap
Expand Down
10 changes: 10 additions & 0 deletions src/node_internals.h
Expand Up @@ -386,6 +386,16 @@ class TraceEventScope {
void* id_;
};

namespace heap {

void DeleteHeapSnapshot(const v8::HeapSnapshot* snapshot);
using HeapSnapshotPointer =
DeleteFnPtr<const v8::HeapSnapshot, DeleteHeapSnapshot>;

BaseObjectPtr<AsyncWrap> CreateHeapSnapshotStream(
Environment* env, HeapSnapshotPointer&& snapshot);
} // namespace heap

} // namespace node

#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
Expand Down

0 comments on commit ab8f38b

Please sign in to comment.