Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: lazy decode body #1276

Merged
merged 12 commits into from Mar 19, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
74 changes: 30 additions & 44 deletions lib/fetch/index.js
Expand Up @@ -48,7 +48,7 @@ const {
} = require('./constants')
const { kHeadersList } = require('../core/symbols')
const EE = require('events')
const { PassThrough, pipeline } = require('stream')
const { Readable, pipeline } = require('stream')
const { isErrored, isReadable } = require('../core/util')
const { kIsMockActive } = require('../mock/mock-symbols')
const { dataURLProcessor } = require('./dataURL')
Expand Down Expand Up @@ -762,16 +762,16 @@ async function schemeFetch (fetchParams) {
switch (scheme) {
case 'about:': {
// If request’s current URL’s path is the string "blank", then return a new response
// whose status message is `OK`, header list is « (`Content-Type`, `text/html;charset=utf-8`) »,
// and body is the empty byte sequence.
// whose status message is `OK`, header list is « (`Content-Type`, `text/html;charset=utf-8`) »,
// and body is the empty byte sequence.
if (path === 'blank') {
const resp = makeResponse({
statusText: 'OK',
headersList: [
'content-type', 'text/html;charset=utf-8'
]
})

resp.urlList = [new URL('about:blank')]
return resp
}
Expand All @@ -784,12 +784,12 @@ async function schemeFetch (fetchParams) {

context.on('terminated', onRequestAborted)

// 1. Run these steps, but abort when the ongoing fetch is terminated:
// 1. Run these steps, but abort when the ongoing fetch is terminated:
// 1a. Let blob be request’s current URL’s blob URL entry’s object.
// https://w3c.github.io/FileAPI/#blob-url-entry
// P.S. Thank God this method is available in node.
const currentURL = requestCurrentURL(request)

// https://github.com/web-platform-tests/wpt/blob/7b0ebaccc62b566a1965396e5be7bb2bc06f841f/FileAPI/url/resources/fetch-tests.js#L52-L56
// Buffer.resolveObjectURL does not ignore URL queries.
if (currentURL.search.length !== 0) {
Expand All @@ -803,7 +803,7 @@ async function schemeFetch (fetchParams) {
return makeNetworkError('invalid method')
}

// 3a. Let response be a new response whose status message is `OK`.
// 3a. Let response be a new response whose status message is `OK`.
const response = makeResponse({ statusText: 'OK', urlList: [currentURL] })

// 4a. Append (`Content-Length`, blob’s size attribute value) to response’s header list.
Expand Down Expand Up @@ -1721,7 +1721,7 @@ function httpNetworkFetch (

// 12. Let highWaterMark be a non-negative, non-NaN number, chosen by
// the user agent.
const highWaterMark = 64 * 1024 // Same as nodejs fs streams.
const highWaterMark = 0 // We already buffer plenty in socket + stream + decoders.

// 13. Let sizeAlgorithm be an algorithm that accepts a chunk object
// and returns a non-negative, non-NaN, non-infinite number, chosen by the user agent.
Expand All @@ -1735,19 +1735,12 @@ function httpNetworkFetch (
ReadableStream = require('stream/web').ReadableStream
}

let pullResolve

const stream = new ReadableStream(
{
async start (controller) {
context.controller = controller
},
async pull (controller) {
if (!pullAlgorithm) {
await new Promise((resolve) => {
pullResolve = resolve
})
}
await pullAlgorithm(controller)
},
async cancel (reason) {
Expand Down Expand Up @@ -1816,7 +1809,7 @@ function httpNetworkFetch (
maxRedirections: 0
},
{
decoder: null,
body: null,
abort: null,
context,

Expand Down Expand Up @@ -1876,26 +1869,24 @@ function httpNetworkFetch (
}
}

if (decoders.length > 1) {
pipeline(...decoders, () => {})
} else if (decoders.length === 0) {
// TODO (perf): Avoid intermediate.
decoders.push(new PassThrough())
}

this.decoder = decoders[0].on('drain', resume)
this.body = new Readable({ read: resume })

const iterator = decoders[decoders.length - 1][Symbol.asyncIterator]()
const src = decoders.length
? pipeline(this.body, ...decoders, () => {})
: this.body.on('error', () => {})
const iterator = src[Symbol.asyncIterator]()

pullAlgorithm = async (controller) => {
// 1-3. See onData...

// 4. Set bytes to the result of handling content codings given
// codings and bytes.
let bytes
try {
const { done, value } = await iterator.next()
bytes = done ? undefined : value
} catch (err) {
if (this.decoder.writableEnded && !timingInfo.encodedBodySize) {
if (this.body._readableState.ended && !timingInfo.encodedBodySize) {
// zlib doesn't like empty streams.
bytes = undefined
} else {
Expand All @@ -1908,10 +1899,10 @@ function httpNetworkFetch (
// body is done normally and stream is readable, then close
// stream, finalize response for fetchParams and response, and
// abort these in-parallel steps.
finalizeResponse(fetchParams, response)

controller.close()

finalizeResponse(fetchParams, response)

return
}

Expand Down Expand Up @@ -1939,11 +1930,6 @@ function httpNetworkFetch (
return controller.desiredSize > 0
}

if (pullResolve) {
pullResolve()
pullResolve = null
}

resolve(response)

return true
Expand All @@ -1954,30 +1940,30 @@ function httpNetworkFetch (
return
}

// 1. If one or more bytes have been transmitted from response’s
// message body, then:
// 1. If one or more bytes have been transmitted from response’s
// message body, then:

// 1. Let bytes be the transmitted bytes.
// 1. Let bytes be the transmitted bytes.
const bytes = chunk

// 2. Let codings be the result of extracting header list values
// given `Content-Encoding` and response’s header list.
// See pullAlgorithm.
// 2. Let codings be the result of extracting header list values
// given `Content-Encoding` and response’s header list.
// See pullAlgorithm.

// 3. Increase timingInfo’s encoded body size by bytes’s length.
// 3. Increase timingInfo’s encoded body size by bytes’s length.
timingInfo.encodedBodySize += bytes.byteLength

// 4. See pullAlgorithm...
// 4. See pullAlgorithm...

return this.decoder.write(bytes)
return this.body.push(bytes)
},

onComplete () {
this.decoder.end()
this.body.push(null)
},

onError (error) {
this.decoder?.destroy(error)
this.body?.destroy(error)

this.context.terminate({ reason: error })

Expand Down