diff --git a/doc/api/stream.md b/doc/api/stream.md index 8ec678a56a223f..293393c6bd29b7 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -3346,6 +3346,29 @@ reader.read().then(({ value, done }) => { }); ``` +### `stream.getDefaultHighWaterMark(objectMode)` + + + +* {boolean} objectMode +* Returns: {integer} + +Returns the default highWaterMark used by streams. +Defaults to `16384` (16 KiB), or `16` for `objectMode`. + +### `stream.setDefaultHighWaterMark(objectMode, value)` + + + +* {boolean} objectMode +* {integer} highWaterMark value + +Sets the default highWaterMark used by streams. + ## API for stream implementers diff --git a/lib/_http_outgoing.js b/lib/_http_outgoing.js index bd9d5e45bfb3c1..6db6298674417d 100644 --- a/lib/_http_outgoing.js +++ b/lib/_http_outgoing.js @@ -80,12 +80,11 @@ let debug = require('internal/util/debuglog').debuglog('http', (fn) => { debug = fn; }); -const HIGH_WATER_MARK = getDefaultHighWaterMark(); - const kCorked = Symbol('corked'); const kUniqueHeaders = Symbol('kUniqueHeaders'); const kBytesWritten = Symbol('kBytesWritten'); const kErrored = Symbol('errored'); +const kHighWaterMark = Symbol('kHighWaterMark'); const nop = () => {}; @@ -150,6 +149,7 @@ function OutgoingMessage() { this._onPendingData = nop; this[kErrored] = null; + this[kHighWaterMark] = getDefaultHighWaterMark(); } ObjectSetPrototypeOf(OutgoingMessage.prototype, Stream.prototype); ObjectSetPrototypeOf(OutgoingMessage, Stream); @@ -196,7 +196,7 @@ ObjectDefineProperty(OutgoingMessage.prototype, 'writableLength', { ObjectDefineProperty(OutgoingMessage.prototype, 'writableHighWaterMark', { __proto__: null, get() { - return this.socket ? this.socket.writableHighWaterMark : HIGH_WATER_MARK; + return this.socket ? this.socket.writableHighWaterMark : this[kHighWaterMark]; }, }); @@ -403,7 +403,7 @@ function _writeRaw(data, encoding, callback, size) { this.outputData.push({ data, encoding, callback }); this.outputSize += data.length; this._onPendingData(data.length); - return this.outputSize < HIGH_WATER_MARK; + return this.outputSize < this[kHighWaterMark]; } diff --git a/lib/internal/streams/state.js b/lib/internal/streams/state.js index 7b05a1ab0b5a51..98f8d8a6cc33d5 100644 --- a/lib/internal/streams/state.js +++ b/lib/internal/streams/state.js @@ -4,16 +4,29 @@ const { MathFloor, NumberIsInteger, } = primordials; +const { validateInteger } = require('internal/validators'); const { ERR_INVALID_ARG_VALUE } = require('internal/errors').codes; +let defaultHighWaterMarkBytes = 16 * 1024; +let defaultHighWaterMarkObjectMode = 16; + function highWaterMarkFrom(options, isDuplex, duplexKey) { return options.highWaterMark != null ? options.highWaterMark : isDuplex ? options[duplexKey] : null; } function getDefaultHighWaterMark(objectMode) { - return objectMode ? 16 : 16 * 1024; + return objectMode ? defaultHighWaterMarkObjectMode : defaultHighWaterMarkBytes; +} + +function setDefaultHighWaterMark(objectMode, value) { + validateInteger(value, 'value', 0); + if (objectMode) { + defaultHighWaterMarkObjectMode = value; + } else { + defaultHighWaterMarkBytes = value; + } } function getHighWaterMark(state, options, duplexKey, isDuplex) { @@ -33,4 +46,5 @@ function getHighWaterMark(state, options, duplexKey, isDuplex) { module.exports = { getHighWaterMark, getDefaultHighWaterMark, + setDefaultHighWaterMark, }; diff --git a/lib/stream.js b/lib/stream.js index e8f205c056834f..9a09401e7d016a 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -42,6 +42,7 @@ const { }, } = require('internal/errors'); const compose = require('internal/streams/compose'); +const { setDefaultHighWaterMark, getDefaultHighWaterMark } = require('internal/streams/state'); const { pipeline } = require('internal/streams/pipeline'); const { destroyer } = require('internal/streams/destroy'); const eos = require('internal/streams/end-of-stream'); @@ -105,6 +106,8 @@ Stream.addAbortSignal = addAbortSignal; Stream.finished = eos; Stream.destroy = destroyer; Stream.compose = compose; +Stream.setDefaultHighWaterMark = setDefaultHighWaterMark; +Stream.getDefaultHighWaterMark = getDefaultHighWaterMark; ObjectDefineProperty(Stream, 'promises', { __proto__: null, diff --git a/test/parallel/test-stream-set-default-hwm.js b/test/parallel/test-stream-set-default-hwm.js new file mode 100644 index 00000000000000..3d78907b74f5a5 --- /dev/null +++ b/test/parallel/test-stream-set-default-hwm.js @@ -0,0 +1,36 @@ +'use strict'; + +require('../common'); + +const assert = require('node:assert'); +const { + setDefaultHighWaterMark, + getDefaultHighWaterMark, + Writable, + Readable, + Transform +} = require('stream'); + +assert.notStrictEqual(getDefaultHighWaterMark(false), 32 * 1000); +setDefaultHighWaterMark(false, 32 * 1000); +assert.strictEqual(getDefaultHighWaterMark(false), 32 * 1000); + +assert.notStrictEqual(getDefaultHighWaterMark(true), 32); +setDefaultHighWaterMark(true, 32); +assert.strictEqual(getDefaultHighWaterMark(true), 32); + +const w = new Writable({ + write() {} +}); +assert.strictEqual(w.writableHighWaterMark, 32 * 1000); + +const r = new Readable({ + read() {} +}); +assert.strictEqual(r.readableHighWaterMark, 32 * 1000); + +const t = new Transform({ + transform() {} +}); +assert.strictEqual(t.writableHighWaterMark, 32 * 1000); +assert.strictEqual(t.readableHighWaterMark, 32 * 1000);