Skip to content

Commit

Permalink
feat: allow overriding hwm
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Apr 12, 2023
1 parent b20405e commit bccb98f
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 6 deletions.
11 changes: 8 additions & 3 deletions lib/api/api-request.js
Expand Up @@ -16,13 +16,17 @@ class RequestHandler extends AsyncResource {
throw new InvalidArgumentError('invalid opts')
}

const { signal, method, opaque, body, onInfo, responseHeaders, throwOnError } = opts
const { signal, method, opaque, body, onInfo, responseHeaders, throwOnError, highWaterMark } = opts

try {
if (typeof callback !== 'function') {
throw new InvalidArgumentError('invalid callback')
}

if (highWaterMark && (typeof highWaterMark !== 'number' || highWaterMark < 0)) {
throw new InvalidArgumentError('invalid highWaterMark')
}

if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') {
throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget')
}
Expand Down Expand Up @@ -53,6 +57,7 @@ class RequestHandler extends AsyncResource {
this.context = null
this.onInfo = onInfo || null
this.throwOnError = throwOnError
this.highWaterMark = highWaterMark

if (util.isStream(body)) {
body.on('error', (err) => {
Expand All @@ -73,7 +78,7 @@ class RequestHandler extends AsyncResource {
}

onHeaders (statusCode, rawHeaders, resume, statusMessage) {
const { callback, opaque, abort, context } = this
const { callback, opaque, abort, context, highWaterMark } = this

if (statusCode < 200) {
if (this.onInfo) {
Expand All @@ -85,7 +90,7 @@ class RequestHandler extends AsyncResource {

const parsedHeaders = util.parseHeaders(rawHeaders)
const contentType = parsedHeaders['content-type']
const body = new Readable(resume, abort, contentType)
const body = new Readable({ resume, abort, contentType, highWaterMark })

this.callback = null
this.res = body
Expand Down
9 changes: 7 additions & 2 deletions lib/api/readable.js
Expand Up @@ -17,11 +17,16 @@ const kAbort = Symbol('abort')
const kContentType = Symbol('kContentType')

module.exports = class BodyReadable extends Readable {
constructor (resume, abort, contentType = '') {
constructor ({
resume,
abort,
contentType = '',
highWaterMark = 64 * 1024 // Same as nodejs fs streams.
}) {
super({
autoDestroy: true,
read: resume,
highWaterMark: 64 * 1024 // Same as nodejs fs streams.
highWaterMark
})

this._readableState.dataEmitted = false
Expand Down
23 changes: 23 additions & 0 deletions test/client-request.js
Expand Up @@ -75,6 +75,29 @@ test('request dump with abort signal', (t) => {
})
})

test('request hwm', (t) => {
t.plan(2)
const server = createServer((req, res) => {
res.write('hello')
})
t.teardown(server.close.bind(server))

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.teardown(client.destroy.bind(client))

client.request({
path: '/',
method: 'GET',
highWaterMark: 1000
}, (err, { body }) => {
t.error(err)
t.same(body.readableHighWaterMark, 1000)
body.dump()
})
})
})

test('request abort before headers', (t) => {
t.plan(6)

Expand Down
2 changes: 1 addition & 1 deletion test/readable.test.js
Expand Up @@ -8,7 +8,7 @@ test('avoid body reordering', async function (t) {
}
function abort () {
}
const r = new Readable(resume, abort)
const r = new Readable({ resume, abort })

r.push(Buffer.from('hello'))

Expand Down
2 changes: 2 additions & 0 deletions types/dispatcher.d.ts
Expand Up @@ -142,6 +142,8 @@ declare namespace Dispatcher {
onInfo?: (info: { statusCode: number, headers: Record<string, string | string[]> }) => void;
/** Default: `null` */
responseHeader?: 'raw' | null;
/** Default: `64K` */
highWaterMark?: number | null;
}
export interface PipelineOptions extends RequestOptions {
/** `true` if the `handler` will return an object stream. Default: `false` */
Expand Down

0 comments on commit bccb98f

Please sign in to comment.