From e7b5c0ed47a18fcb24c4258979c211a1c5d5bf48 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 29 Mar 2023 20:02:10 +0200 Subject: [PATCH] stream: add setter & getter for default highWaterMark (#46929) Adds stream.(get|set)DefaultHighWaterMark to read or update the default hwm. PR-URL: https://github.com/nodejs/node/pull/46929 Reviewed-By: Matteo Collina Reviewed-By: Paolo Insogna Reviewed-By: Moshe Atlow Reviewed-By: Luigi Pinca Reviewed-By: Benjamin Gruenbaum Reviewed-By: Michael Dawson Reviewed-By: Erick Wendel --- doc/api/stream.md | 23 +++++++++++++ lib/_http_outgoing.js | 8 ++--- lib/internal/streams/state.js | 16 ++++++++- lib/stream.js | 3 ++ test/parallel/test-stream-set-default-hwm.js | 36 ++++++++++++++++++++ 5 files changed, 81 insertions(+), 5 deletions(-) create mode 100644 test/parallel/test-stream-set-default-hwm.js diff --git a/doc/api/stream.md b/doc/api/stream.md index a04c30fbd10c2a..f01b6ad6e1f727 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -3348,6 +3348,29 @@ reader.read().then(({ value, done }) => { }); ``` +### `stream.getDefaultHighWaterMark(objectMode)` + + + +* {boolean} objectMode +* Returns: {integer} + +Returns the default highWaterMark used by streams. +Defaults to `16384` (16 KiB), or `16` for `objectMode`. + +### `stream.setDefaultHighWaterMark(objectMode, value)` + + + +* {boolean} objectMode +* {integer} highWaterMark value + +Sets the default highWaterMark used by streams. + ## API for stream implementers diff --git a/lib/_http_outgoing.js b/lib/_http_outgoing.js index 7287120c43b713..6b1b8703f9f0de 100644 --- a/lib/_http_outgoing.js +++ b/lib/_http_outgoing.js @@ -80,12 +80,11 @@ let debug = require('internal/util/debuglog').debuglog('http', (fn) => { debug = fn; }); -const HIGH_WATER_MARK = getDefaultHighWaterMark(); - const kCorked = Symbol('corked'); const kUniqueHeaders = Symbol('kUniqueHeaders'); const kBytesWritten = Symbol('kBytesWritten'); const kErrored = Symbol('errored'); +const kHighWaterMark = Symbol('kHighWaterMark'); const nop = () => {}; @@ -150,6 +149,7 @@ function OutgoingMessage() { this._onPendingData = nop; this[kErrored] = null; + this[kHighWaterMark] = getDefaultHighWaterMark(); } ObjectSetPrototypeOf(OutgoingMessage.prototype, Stream.prototype); ObjectSetPrototypeOf(OutgoingMessage, Stream); @@ -196,7 +196,7 @@ ObjectDefineProperty(OutgoingMessage.prototype, 'writableLength', { ObjectDefineProperty(OutgoingMessage.prototype, 'writableHighWaterMark', { __proto__: null, get() { - return this.socket ? this.socket.writableHighWaterMark : HIGH_WATER_MARK; + return this.socket ? this.socket.writableHighWaterMark : this[kHighWaterMark]; }, }); @@ -403,7 +403,7 @@ function _writeRaw(data, encoding, callback, size) { this.outputData.push({ data, encoding, callback }); this.outputSize += data.length; this._onPendingData(data.length); - return this.outputSize < HIGH_WATER_MARK; + return this.outputSize < this[kHighWaterMark]; } diff --git a/lib/internal/streams/state.js b/lib/internal/streams/state.js index 7b05a1ab0b5a51..98f8d8a6cc33d5 100644 --- a/lib/internal/streams/state.js +++ b/lib/internal/streams/state.js @@ -4,16 +4,29 @@ const { MathFloor, NumberIsInteger, } = primordials; +const { validateInteger } = require('internal/validators'); const { ERR_INVALID_ARG_VALUE } = require('internal/errors').codes; +let defaultHighWaterMarkBytes = 16 * 1024; +let defaultHighWaterMarkObjectMode = 16; + function highWaterMarkFrom(options, isDuplex, duplexKey) { return options.highWaterMark != null ? options.highWaterMark : isDuplex ? options[duplexKey] : null; } function getDefaultHighWaterMark(objectMode) { - return objectMode ? 16 : 16 * 1024; + return objectMode ? defaultHighWaterMarkObjectMode : defaultHighWaterMarkBytes; +} + +function setDefaultHighWaterMark(objectMode, value) { + validateInteger(value, 'value', 0); + if (objectMode) { + defaultHighWaterMarkObjectMode = value; + } else { + defaultHighWaterMarkBytes = value; + } } function getHighWaterMark(state, options, duplexKey, isDuplex) { @@ -33,4 +46,5 @@ function getHighWaterMark(state, options, duplexKey, isDuplex) { module.exports = { getHighWaterMark, getDefaultHighWaterMark, + setDefaultHighWaterMark, }; diff --git a/lib/stream.js b/lib/stream.js index e8f205c056834f..9a09401e7d016a 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -42,6 +42,7 @@ const { }, } = require('internal/errors'); const compose = require('internal/streams/compose'); +const { setDefaultHighWaterMark, getDefaultHighWaterMark } = require('internal/streams/state'); const { pipeline } = require('internal/streams/pipeline'); const { destroyer } = require('internal/streams/destroy'); const eos = require('internal/streams/end-of-stream'); @@ -105,6 +106,8 @@ Stream.addAbortSignal = addAbortSignal; Stream.finished = eos; Stream.destroy = destroyer; Stream.compose = compose; +Stream.setDefaultHighWaterMark = setDefaultHighWaterMark; +Stream.getDefaultHighWaterMark = getDefaultHighWaterMark; ObjectDefineProperty(Stream, 'promises', { __proto__: null, diff --git a/test/parallel/test-stream-set-default-hwm.js b/test/parallel/test-stream-set-default-hwm.js new file mode 100644 index 00000000000000..3d78907b74f5a5 --- /dev/null +++ b/test/parallel/test-stream-set-default-hwm.js @@ -0,0 +1,36 @@ +'use strict'; + +require('../common'); + +const assert = require('node:assert'); +const { + setDefaultHighWaterMark, + getDefaultHighWaterMark, + Writable, + Readable, + Transform +} = require('stream'); + +assert.notStrictEqual(getDefaultHighWaterMark(false), 32 * 1000); +setDefaultHighWaterMark(false, 32 * 1000); +assert.strictEqual(getDefaultHighWaterMark(false), 32 * 1000); + +assert.notStrictEqual(getDefaultHighWaterMark(true), 32); +setDefaultHighWaterMark(true, 32); +assert.strictEqual(getDefaultHighWaterMark(true), 32); + +const w = new Writable({ + write() {} +}); +assert.strictEqual(w.writableHighWaterMark, 32 * 1000); + +const r = new Readable({ + read() {} +}); +assert.strictEqual(r.readableHighWaterMark, 32 * 1000); + +const t = new Transform({ + transform() {} +}); +assert.strictEqual(t.writableHighWaterMark, 32 * 1000); +assert.strictEqual(t.readableHighWaterMark, 32 * 1000);