Skip to content

Commit

Permalink
worker: allow specifying resource limits
Browse files Browse the repository at this point in the history
Allow specifying resource limits for the JS engine instance created
as part of a Worker.

PR-URL: #26628
Reviewed-By: Joyee Cheung <joyeec9h3@gmail.com>
Reviewed-By: Gireesh Punathil <gpunathi@in.ibm.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Franziska Hinkelmann <franziska.hinkelmann@gmail.com>
  • Loading branch information
addaleax authored and BethGriggs committed Feb 6, 2020
1 parent 6bee687 commit 7a8b659
Show file tree
Hide file tree
Showing 8 changed files with 315 additions and 26 deletions.
5 changes: 5 additions & 0 deletions doc/api/errors.md
Expand Up @@ -1990,6 +1990,11 @@ meaning of the error depends on the specific function.
The `execArgv` option passed to the `Worker` constructor contains
invalid flags.

<a id="ERR_WORKER_OUT_OF_MEMORY"></a>
### `ERR_WORKER_OUT_OF_MEMORY`

The `Worker` instance terminated because it reached its memory limit.

<a id="ERR_WORKER_PATH"></a>
### `ERR_WORKER_PATH`

Expand Down
48 changes: 48 additions & 0 deletions doc/api/worker_threads.md
Expand Up @@ -157,6 +157,22 @@ console.log(receiveMessageOnPort(port2));
When this function is used, no `'message'` event will be emitted and the
`onmessage` listener will not be invoked.

### `worker.resourceLimits`
<!-- YAML
added: REPLACEME
-->

* {Object|undefined}
* `maxYoungGenerationSizeMb` {number}
* `maxOldGenerationSizeMb` {number}
* `codeRangeSizeMb` {number}

Provides the set of JS engine resource constraints inside this Worker thread.
If the `resourceLimits` option was passed to the [`Worker`][] constructor,
this matches its values.

If this is used in the main thread, its value is an empty object.

## `worker.SHARE_ENV`
<!-- YAML
added: v11.14.0
Expand Down Expand Up @@ -488,6 +504,13 @@ if (isMainThread) {
```

### `new Worker(filename[, options])`
<!-- YAML
added: v10.5.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/26628
description: The `resourceLimits` option was introduced.
-->

* `filename` {string} The path to the Worker’s main script. Must be
either an absolute path or a relative path (i.e. relative to the
Expand Down Expand Up @@ -519,6 +542,16 @@ if (isMainThread) {
occur as described in the [HTML structured clone algorithm][], and an error
will be thrown if the object cannot be cloned (e.g. because it contains
`function`s).
* `resourceLimits` {Object} An optional set of resource limits for the new
JS engine instance. Reaching these limits will lead to termination of the
`Worker` instance. These limits only affect the JS engine, and no external
data, including no `ArrayBuffer`s. Even if these limits are set, the process
may still abort if it encounters a global out-of-memory situation.
* `maxOldGenerationSizeMb` {number} The maximum size of the main heap in MB.
* `maxYoungGenerationSizeMb` {number} The maximum size of a heap space for
recently created objects.
* `codeRangeSizeMb` {number} The size of a pre-allocated memory range
used for generated code.

### Event: `'error'`
<!-- YAML
Expand Down Expand Up @@ -583,6 +616,21 @@ Opposite of `unref()`, calling `ref()` on a previously `unref()`ed worker will
behavior). If the worker is `ref()`ed, calling `ref()` again will have
no effect.

### `worker.resourceLimits`
<!-- YAML
added: REPLACEME
-->

* {Object}
* `maxYoungGenerationSizeMb` {number}
* `maxOldGenerationSizeMb` {number}
* `codeRangeSizeMb` {number}

Provides the set of JS engine resource constraints for this Worker thread.
If the `resourceLimits` option was passed to the [`Worker`][] constructor,
this matches its values.

If the worker has stopped, the return value is an empty object.
### `worker.stderr`
<!-- YAML
added: v10.5.0
Expand Down
2 changes: 2 additions & 0 deletions lib/internal/errors.js
Expand Up @@ -1218,6 +1218,8 @@ E('ERR_VM_MODULE_STATUS', 'Module status %s', Error);
E('ERR_WORKER_INVALID_EXEC_ARGV', (errors) =>
`Initiated Worker with invalid execArgv flags: ${errors.join(', ')}`,
Error);
E('ERR_WORKER_OUT_OF_MEMORY', 'Worker terminated due to reaching memory limit',
Error);
E('ERR_WORKER_PATH',
'The worker script filename must be an absolute path or a relative ' +
'path starting with \'./\' or \'../\'. Received "%s"',
Expand Down
52 changes: 47 additions & 5 deletions lib/internal/worker.js
Expand Up @@ -2,19 +2,20 @@

/* global SharedArrayBuffer */

const { Object } = primordials;
const { Math, Object } = primordials;

const EventEmitter = require('events');
const assert = require('internal/assert');
const path = require('path');

const errorCodes = require('internal/errors').codes;
const {
ERR_WORKER_PATH,
ERR_WORKER_UNSERIALIZABLE_ERROR,
ERR_WORKER_UNSUPPORTED_EXTENSION,
ERR_WORKER_INVALID_EXEC_ARGV,
ERR_INVALID_ARG_TYPE,
} = require('internal/errors').codes;
} = errorCodes;
const { validateString } = require('internal/validators');
const { getOptionValue } = require('internal/options');

Expand All @@ -37,8 +38,13 @@ const { pathToFileURL } = require('url');
const {
ownsProcessState,
isMainThread,
resourceLimits: resourceLimitsRaw,
threadId,
Worker: WorkerImpl,
kMaxYoungGenerationSizeMb,
kMaxOldGenerationSizeMb,
kCodeRangeSizeMb,
kTotalResourceLimitCount
} = internalBinding('worker');

const kHandle = Symbol('kHandle');
Expand Down Expand Up @@ -102,7 +108,8 @@ class Worker extends EventEmitter {

const url = options.eval ? null : pathToFileURL(filename);
// Set up the C++ handle for the worker, as well as some internal wiring.
this[kHandle] = new WorkerImpl(url, options.execArgv);
this[kHandle] = new WorkerImpl(url, options.execArgv,
parseResourceLimits(options.resourceLimits));
if (this[kHandle].invalidExecArgv) {
throw new ERR_WORKER_INVALID_EXEC_ARGV(this[kHandle].invalidExecArgv);
}
Expand All @@ -113,7 +120,7 @@ class Worker extends EventEmitter {
} else if (env !== undefined) {
this[kHandle].setEnvVars(env);
}
this[kHandle].onexit = (code) => this[kOnExit](code);
this[kHandle].onexit = (code, customErr) => this[kOnExit](code, customErr);
this[kPort] = this[kHandle].messagePort;
this[kPort].on('message', (data) => this[kOnMessage](data));
this[kPort].start();
Expand Down Expand Up @@ -157,11 +164,15 @@ class Worker extends EventEmitter {
this[kHandle].startThread();
}

[kOnExit](code) {
[kOnExit](code, customErr) {
debug(`[${threadId}] hears end event for Worker ${this.threadId}`);
drainMessagePort(this[kPublicPort]);
drainMessagePort(this[kPort]);
this[kDispose]();
if (customErr) {
debug(`[${threadId}] failing with custom error ${customErr}`);
this.emit('error', new errorCodes[customErr]());
}
this.emit('exit', code);
this.removeAllListeners();
}
Expand Down Expand Up @@ -280,6 +291,12 @@ class Worker extends EventEmitter {
get stderr() {
return this[kParentSideStdio].stderr;
}

get resourceLimits() {
if (this[kHandle] === null) return {};

return makeResourceLimits(this[kHandle].getResourceLimits());
}
}

function pipeWithoutWarning(source, dest) {
Expand All @@ -294,10 +311,35 @@ function pipeWithoutWarning(source, dest) {
dest._maxListeners = destMaxListeners;
}

const resourceLimitsArray = new Float64Array(kTotalResourceLimitCount);
function parseResourceLimits(obj) {
const ret = resourceLimitsArray;
ret.fill(-1);
if (typeof obj !== 'object' || obj === null) return ret;

if (typeof obj.maxOldGenerationSizeMb === 'number')
ret[kMaxOldGenerationSizeMb] = Math.max(obj.maxOldGenerationSizeMb, 2);
if (typeof obj.maxYoungGenerationSizeMb === 'number')
ret[kMaxYoungGenerationSizeMb] = obj.maxYoungGenerationSizeMb;
if (typeof obj.codeRangeSizeMb === 'number')
ret[kCodeRangeSizeMb] = obj.codeRangeSizeMb;
return ret;
}

function makeResourceLimits(float64arr) {
return {
maxYoungGenerationSizeMb: float64arr[kMaxYoungGenerationSizeMb],
maxOldGenerationSizeMb: float64arr[kMaxOldGenerationSizeMb],
codeRangeSizeMb: float64arr[kCodeRangeSizeMb]
};
}

module.exports = {
ownsProcessState,
isMainThread,
SHARE_ENV,
resourceLimits:
!isMainThread ? makeResourceLimits(resourceLimitsRaw) : {},
threadId,
Worker,
};
2 changes: 2 additions & 0 deletions lib/worker_threads.js
Expand Up @@ -3,6 +3,7 @@
const {
isMainThread,
SHARE_ENV,
resourceLimits,
threadId,
Worker
} = require('internal/worker');
Expand All @@ -20,6 +21,7 @@ module.exports = {
MessageChannel,
moveMessagePortToContext,
receiveMessageOnPort,
resourceLimits,
threadId,
SHARE_ENV,
Worker,
Expand Down

0 comments on commit 7a8b659

Please sign in to comment.