Skip to content

Commit

Permalink
lib: reuse invalid state errors on webstreams
Browse files Browse the repository at this point in the history
Signed-off-by: RafaelGSS <rafael.nunu@hotmail.com>
PR-URL: nodejs#46086
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Yagiz Nizipli <yagiz@nizipli.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
RafaelGSS authored and theanarkh committed Jan 13, 2023
1 parent 31ea7be commit c1cc10d
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 10 deletions.
15 changes: 15 additions & 0 deletions doc/api/process.md
Expand Up @@ -1103,6 +1103,21 @@ and [Cluster][] documentation), the `process.connected` property will return
Once `process.connected` is `false`, it is no longer possible to send messages
over the IPC channel using `process.send()`.

## `process.constrainedMemory()`

<!-- YAML
added: REPLACEME
-->

* {number}

Gets the amount of memory available to the process (in bytes) based on
limits imposed by the OS. If there is no such constraint, or the constraint
is unknown, `0` is returned. It is not unusual for this value to
be less than or greater than `os.totalmem()`. This function currently only
returns a non-zero value on Linux, based on cgroups if it is present, and
on z/OS based on `RLIMIT_MEMLIMIT`.

## `process.cpuUsage([previousValue])`

<!-- YAML
Expand Down
1 change: 1 addition & 0 deletions lib/internal/bootstrap/node.js
Expand Up @@ -184,6 +184,7 @@ const rawMethods = internalBinding('process_methods');

process.hrtime = perThreadSetup.hrtime;
process.hrtime.bigint = perThreadSetup.hrtimeBigInt;
process.constrainedMemory = perThreadSetup.constrainedMemory;

process.openStdin = function() {
process.stdin.resume();
Expand Down
8 changes: 8 additions & 0 deletions lib/internal/process/per_thread.js
Expand Up @@ -60,6 +60,7 @@ const binding = internalBinding('process_methods');

let hrValues;
let hrBigintValues;
let constrainedMemoryValus;

function refreshHrtimeBuffer() {
// The 3 entries filled in by the original process.hrtime contains
Expand All @@ -69,6 +70,7 @@ function refreshHrtimeBuffer() {
// Use a BigUint64Array in the closure because this is actually a bit
// faster than simply returning a BigInt from C++ in V8 7.1.
hrBigintValues = new BigUint64Array(binding.hrtimeBuffer, 0, 1);
constrainedMemoryValus = new BigUint64Array(binding.hrtimeBuffer, 0, 1);
}

// Create the buffers.
Expand Down Expand Up @@ -100,6 +102,11 @@ function hrtimeBigInt() {
return hrBigintValues[0];
}

function constrainedMemory() {
binding.constrainedMemory();
return constrainedMemoryValus[0];
}

function nop() {}

// The execution of this function itself should not cause any side effects.
Expand Down Expand Up @@ -427,4 +434,5 @@ module.exports = {
hrtime,
hrtimeBigInt,
refreshHrtimeBuffer,
constrainedMemory,
};
38 changes: 32 additions & 6 deletions lib/internal/webstreams/readablestream.js
Expand Up @@ -54,6 +54,7 @@ const {
isArrayBufferDetached,
kEmptyObject,
kEnumerableProperty,
SideEffectFreeRegExpPrototypeSymbolReplace,
} = require('internal/util');

const {
Expand Down Expand Up @@ -140,6 +141,32 @@ const kError = Symbol('kError');
const kPull = Symbol('kPull');
const kRelease = Symbol('kRelease');

let releasedError;
let releasingError;

const userModuleRegExp = /^ {4}at (?:[^/\\(]+ \()(?!node:(.+):\d+:\d+\)$).*/gm;

function lazyReadableReleasedError() {
if (releasedError) {
return releasedError;
}

releasedError = new ERR_INVALID_STATE.TypeError('Reader released');
// Avoid V8 leak and remove userland stackstrace
releasedError.stack = SideEffectFreeRegExpPrototypeSymbolReplace(userModuleRegExp, releasedError.stack, '');
return releasedError;
}

function lazyReadableReleasingError() {
if (releasingError) {
return releasingError;
}
releasingError = new ERR_INVALID_STATE.TypeError('Releasing reader');
// Avoid V8 leak and remove userland stackstrace
releasingError.stack = SideEffectFreeRegExpPrototypeSymbolReplace(userModuleRegExp, releasingError.stack, '');
return releasingError;
}

const getNonWritablePropertyDescriptor = (value) => {
return {
__proto__: null,
Expand Down Expand Up @@ -2029,7 +2056,7 @@ function readableStreamDefaultReaderRelease(reader) {
readableStreamReaderGenericRelease(reader);
readableStreamDefaultReaderErrorReadRequests(
reader,
new ERR_INVALID_STATE.TypeError('Releasing reader')
lazyReadableReleasingError(),
);
}

Expand All @@ -2044,7 +2071,7 @@ function readableStreamBYOBReaderRelease(reader) {
readableStreamReaderGenericRelease(reader);
readableStreamBYOBReaderErrorReadIntoRequests(
reader,
new ERR_INVALID_STATE.TypeError('Releasing reader')
lazyReadableReleasingError(),
);
}

Expand All @@ -2062,13 +2089,12 @@ function readableStreamReaderGenericRelease(reader) {
assert(stream !== undefined);
assert(stream[kState].reader === reader);

const releasedStateError = lazyReadableReleasedError();
if (stream[kState].state === 'readable') {
reader[kState].close.reject?.(
new ERR_INVALID_STATE.TypeError('Reader released'));
reader[kState].close.reject?.(releasedStateError);
} else {
reader[kState].close = {
promise: PromiseReject(
new ERR_INVALID_STATE.TypeError('Reader released')),
promise: PromiseReject(releasedStateError),
resolve: undefined,
reject: undefined,
};
Expand Down
22 changes: 18 additions & 4 deletions lib/internal/webstreams/writablestream.js
Expand Up @@ -34,6 +34,7 @@ const {
createDeferredPromise,
customInspectSymbol: kInspect,
kEnumerableProperty,
SideEffectFreeRegExpPrototypeSymbolReplace,
} = require('internal/util');

const {
Expand Down Expand Up @@ -77,6 +78,20 @@ const kAbort = Symbol('kAbort');
const kCloseSentinel = Symbol('kCloseSentinel');
const kError = Symbol('kError');

let releasedError;

function lazyWritableReleasedError() {
if (releasedError) {
return releasedError;
}
const userModuleRegExp = /^ {4}at (?:[^/\\(]+ \()(?!node:(.+):\d+:\d+\)$).*/gm;

releasedError = new ERR_INVALID_STATE.TypeError('Writer has been released');
// Avoid V8 leak and remove userland stackstrace
releasedError.stack = SideEffectFreeRegExpPrototypeSymbolReplace(userModuleRegExp, releasedError.stack, '');
return releasedError;
}

const getNonWritablePropertyDescriptor = (value) => {
return {
__proto__: null,
Expand Down Expand Up @@ -970,10 +985,9 @@ function writableStreamDefaultWriterRelease(writer) {
} = writer[kState];
assert(stream !== undefined);
assert(stream[kState].writer === writer);
const releasedError =
new ERR_INVALID_STATE.TypeError('Writer has been released');
writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError);
writableStreamDefaultWriterEnsureClosedPromiseRejected(writer, releasedError);
const releasedStateError = lazyWritableReleasedError();
writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedStateError);
writableStreamDefaultWriterEnsureClosedPromiseRejected(writer, releasedStateError);
stream[kState].writer = undefined;
writer[kState].stream = undefined;
}
Expand Down
6 changes: 6 additions & 0 deletions src/node_process.h
Expand Up @@ -81,6 +81,11 @@ class BindingData : public SnapshotableObject {

static void SlowBigInt(const v8::FunctionCallbackInfo<v8::Value>& args);

static void ConstrainedMemoryImpl(BindingData* receiver);
static void SlowGetConstrainedMemory(
const v8::FunctionCallbackInfo<v8::Value>& args);
static void FastGetConstrainedMemory(v8::Local<v8::Value> receiver);

private:
static constexpr size_t kBufferSize =
std::max(sizeof(uint64_t), sizeof(uint32_t) * 3);
Expand All @@ -92,6 +97,7 @@ class BindingData : public SnapshotableObject {
// time.
static v8::CFunction fast_number_;
static v8::CFunction fast_bigint_;
static v8::CFunction fast_get_constrained_memory_;
};

} // namespace process
Expand Down
27 changes: 27 additions & 0 deletions src/node_process_methods.cc
Expand Up @@ -471,11 +471,18 @@ BindingData::BindingData(Environment* env, v8::Local<v8::Object> object)

v8::CFunction BindingData::fast_number_(v8::CFunction::Make(FastNumber));
v8::CFunction BindingData::fast_bigint_(v8::CFunction::Make(FastBigInt));
v8::CFunction BindingData::fast_get_constrained_memory_ =
v8::CFunction::Make(FastGetConstrainedMemory);

void BindingData::AddMethods() {
Local<Context> ctx = env()->context();
SetFastMethod(ctx, object(), "hrtime", SlowNumber, &fast_number_);
SetFastMethod(ctx, object(), "hrtimeBigInt", SlowBigInt, &fast_bigint_);
SetFastMethod(ctx,
object(),
"constrainedMemory",
SlowGetConstrainedMemory,
&fast_get_constrained_memory_);
}

void BindingData::RegisterExternalReferences(
Expand All @@ -486,6 +493,9 @@ void BindingData::RegisterExternalReferences(
registry->Register(FastBigInt);
registry->Register(fast_number_.GetTypeInfo());
registry->Register(fast_bigint_.GetTypeInfo());
registry->Register(SlowGetConstrainedMemory);
registry->Register(FastGetConstrainedMemory);
registry->Register(fast_get_constrained_memory_.GetTypeInfo());
}

BindingData* BindingData::FromV8Value(Local<Value> value) {
Expand Down Expand Up @@ -533,6 +543,23 @@ void BindingData::SlowNumber(const v8::FunctionCallbackInfo<v8::Value>& args) {
NumberImpl(FromJSObject<BindingData>(args.Holder()));
}

void BindingData::ConstrainedMemoryImpl(BindingData* receiver) {
// Make sure we don't accidentally access buffers wiped for snapshot.
CHECK(!receiver->array_buffer_.IsEmpty());
uint64_t t = uv_get_constrained_memory();
uint64_t* fields = static_cast<uint64_t*>(receiver->backing_store_->Data());
fields[0] = t;
}

void BindingData::SlowGetConstrainedMemory(
const FunctionCallbackInfo<Value>& args) {
ConstrainedMemoryImpl(FromJSObject<BindingData>(args.Holder()));
}

void BindingData::FastGetConstrainedMemory(v8::Local<v8::Value> receiver) {
ConstrainedMemoryImpl(FromV8Value(receiver));
}

bool BindingData::PrepareForSerialization(Local<Context> context,
v8::SnapshotCreator* creator) {
// It's not worth keeping.
Expand Down
12 changes: 12 additions & 0 deletions test/parallel/test-process-constrained-memory.js
@@ -0,0 +1,12 @@
'use strict';
require('../common');
const assert = require('assert');
const { Worker } = require('worker_threads');

if (!process.env.isWorker) {
process.env.isWorker = true;
new Worker(__filename);
assert(process.constrainedMemory() >= 0);
} else {
assert(process.constrainedMemory() >= 0);
}

0 comments on commit c1cc10d

Please sign in to comment.