Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow overriding hwm #2057

Merged
merged 1 commit into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 8 additions & 3 deletions lib/api/api-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@ class RequestHandler extends AsyncResource {
throw new InvalidArgumentError('invalid opts')
}

const { signal, method, opaque, body, onInfo, responseHeaders, throwOnError } = opts
const { signal, method, opaque, body, onInfo, responseHeaders, throwOnError, highWaterMark } = opts

try {
if (typeof callback !== 'function') {
throw new InvalidArgumentError('invalid callback')
}

if (highWaterMark && (typeof highWaterMark !== 'number' || highWaterMark < 0)) {
throw new InvalidArgumentError('invalid highWaterMark')
}

if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') {
throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget')
}
Expand Down Expand Up @@ -53,6 +57,7 @@ class RequestHandler extends AsyncResource {
this.context = null
this.onInfo = onInfo || null
this.throwOnError = throwOnError
this.highWaterMark = highWaterMark

if (util.isStream(body)) {
body.on('error', (err) => {
Expand All @@ -73,7 +78,7 @@ class RequestHandler extends AsyncResource {
}

onHeaders (statusCode, rawHeaders, resume, statusMessage) {
const { callback, opaque, abort, context } = this
const { callback, opaque, abort, context, highWaterMark } = this

if (statusCode < 200) {
if (this.onInfo) {
Expand All @@ -85,7 +90,7 @@ class RequestHandler extends AsyncResource {

const parsedHeaders = util.parseHeaders(rawHeaders)
const contentType = parsedHeaders['content-type']
const body = new Readable(resume, abort, contentType)
const body = new Readable({ resume, abort, contentType, highWaterMark })

this.callback = null
this.res = body
Expand Down
9 changes: 7 additions & 2 deletions lib/api/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@ const kAbort = Symbol('abort')
const kContentType = Symbol('kContentType')

module.exports = class BodyReadable extends Readable {
constructor (resume, abort, contentType = '') {
constructor ({
resume,
abort,
contentType = '',
highWaterMark = 64 * 1024 // Same as nodejs fs streams.
}) {
super({
autoDestroy: true,
read: resume,
highWaterMark: 64 * 1024 // Same as nodejs fs streams.
highWaterMark
})

this._readableState.dataEmitted = false
Expand Down
23 changes: 23 additions & 0 deletions test/client-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,29 @@ test('request dump with abort signal', (t) => {
})
})

test('request hwm', (t) => {
t.plan(2)
const server = createServer((req, res) => {
res.write('hello')
})
t.teardown(server.close.bind(server))

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.teardown(client.destroy.bind(client))

client.request({
path: '/',
method: 'GET',
highWaterMark: 1000
}, (err, { body }) => {
t.error(err)
t.same(body.readableHighWaterMark, 1000)
body.dump()
})
})
})

test('request abort before headers', (t) => {
t.plan(6)

Expand Down
2 changes: 1 addition & 1 deletion test/readable.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ test('avoid body reordering', async function (t) {
}
function abort () {
}
const r = new Readable(resume, abort)
const r = new Readable({ resume, abort })

r.push(Buffer.from('hello'))

Expand Down
2 changes: 2 additions & 0 deletions types/dispatcher.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ declare namespace Dispatcher {
onInfo?: (info: { statusCode: number, headers: Record<string, string | string[]> }) => void;
/** Default: `null` */
responseHeader?: 'raw' | null;
/** Default: `64 KiB` */
highWaterMark?: number;
}
export interface PipelineOptions extends RequestOptions {
/** `true` if the `handler` will return an object stream. Default: `false` */
Expand Down