diff --git a/lib/fetch/index.js b/lib/fetch/index.js index 0eae43cb28f..9bc4f1e77d9 100644 --- a/lib/fetch/index.js +++ b/lib/fetch/index.js @@ -48,7 +48,7 @@ const { } = require('./constants') const { kHeadersList } = require('../core/symbols') const EE = require('events') -const { PassThrough, pipeline } = require('stream') +const { Readable, pipeline } = require('stream') const { isErrored, isReadable } = require('../core/util') const { kIsMockActive } = require('../mock/mock-symbols') const { dataURLProcessor } = require('./dataURL') @@ -762,8 +762,8 @@ async function schemeFetch (fetchParams) { switch (scheme) { case 'about:': { // If request’s current URL’s path is the string "blank", then return a new response - // whose status message is `OK`, header list is « (`Content-Type`, `text/html;charset=utf-8`) », - // and body is the empty byte sequence. + // whose status message is `OK`, header list is « (`Content-Type`, `text/html;charset=utf-8`) », + // and body is the empty byte sequence. if (path === 'blank') { const resp = makeResponse({ statusText: 'OK', @@ -771,7 +771,7 @@ async function schemeFetch (fetchParams) { 'content-type', 'text/html;charset=utf-8' ] }) - + resp.urlList = [new URL('about:blank')] return resp } @@ -784,12 +784,12 @@ async function schemeFetch (fetchParams) { context.on('terminated', onRequestAborted) - // 1. Run these steps, but abort when the ongoing fetch is terminated: + // 1. Run these steps, but abort when the ongoing fetch is terminated: // 1a. Let blob be request’s current URL’s blob URL entry’s object. // https://w3c.github.io/FileAPI/#blob-url-entry // P.S. Thank God this method is available in node. const currentURL = requestCurrentURL(request) - + // https://github.com/web-platform-tests/wpt/blob/7b0ebaccc62b566a1965396e5be7bb2bc06f841f/FileAPI/url/resources/fetch-tests.js#L52-L56 // Buffer.resolveObjectURL does not ignore URL queries. if (currentURL.search.length !== 0) { @@ -803,7 +803,7 @@ async function schemeFetch (fetchParams) { return makeNetworkError('invalid method') } - // 3a. Let response be a new response whose status message is `OK`. + // 3a. Let response be a new response whose status message is `OK`. const response = makeResponse({ statusText: 'OK', urlList: [currentURL] }) // 4a. Append (`Content-Length`, blob’s size attribute value) to response’s header list. @@ -1721,7 +1721,7 @@ function httpNetworkFetch ( // 12. Let highWaterMark be a non-negative, non-NaN number, chosen by // the user agent. - const highWaterMark = 64 * 1024 // Same as nodejs fs streams. + const highWaterMark = 0 // We already buffer plenty in socket + stream + decoders. // 13. Let sizeAlgorithm be an algorithm that accepts a chunk object // and returns a non-negative, non-NaN, non-infinite number, chosen by the user agent. @@ -1735,19 +1735,12 @@ function httpNetworkFetch ( ReadableStream = require('stream/web').ReadableStream } - let pullResolve - const stream = new ReadableStream( { async start (controller) { context.controller = controller }, async pull (controller) { - if (!pullAlgorithm) { - await new Promise((resolve) => { - pullResolve = resolve - }) - } await pullAlgorithm(controller) }, async cancel (reason) { @@ -1816,7 +1809,7 @@ function httpNetworkFetch ( maxRedirections: 0 }, { - decoder: null, + body: null, abort: null, context, @@ -1876,18 +1869,16 @@ function httpNetworkFetch ( } } - if (decoders.length > 1) { - pipeline(...decoders, () => {}) - } else if (decoders.length === 0) { - // TODO (perf): Avoid intermediate. - decoders.push(new PassThrough()) - } - - this.decoder = decoders[0].on('drain', resume) + this.body = new Readable({ read: resume }) - const iterator = decoders[decoders.length - 1][Symbol.asyncIterator]() + const src = decoders.length + ? pipeline(this.body, ...decoders, () => {}) + : this.body.on('error', () => {}) + const iterator = src[Symbol.asyncIterator]() pullAlgorithm = async (controller) => { + // 1-3. See onData... + // 4. Set bytes to the result of handling content codings given // codings and bytes. let bytes @@ -1895,7 +1886,7 @@ function httpNetworkFetch ( const { done, value } = await iterator.next() bytes = done ? undefined : value } catch (err) { - if (this.decoder.writableEnded && !timingInfo.encodedBodySize) { + if (this.body._readableState.ended && !timingInfo.encodedBodySize) { // zlib doesn't like empty streams. bytes = undefined } else { @@ -1908,10 +1899,10 @@ function httpNetworkFetch ( // body is done normally and stream is readable, then close // stream, finalize response for fetchParams and response, and // abort these in-parallel steps. - finalizeResponse(fetchParams, response) - controller.close() + finalizeResponse(fetchParams, response) + return } @@ -1939,11 +1930,6 @@ function httpNetworkFetch ( return controller.desiredSize > 0 } - if (pullResolve) { - pullResolve() - pullResolve = null - } - resolve(response) return true @@ -1954,30 +1940,30 @@ function httpNetworkFetch ( return } - // 1. If one or more bytes have been transmitted from response’s - // message body, then: + // 1. If one or more bytes have been transmitted from response’s + // message body, then: - // 1. Let bytes be the transmitted bytes. + // 1. Let bytes be the transmitted bytes. const bytes = chunk - // 2. Let codings be the result of extracting header list values - // given `Content-Encoding` and response’s header list. - // See pullAlgorithm. + // 2. Let codings be the result of extracting header list values + // given `Content-Encoding` and response’s header list. + // See pullAlgorithm. - // 3. Increase timingInfo’s encoded body size by bytes’s length. + // 3. Increase timingInfo’s encoded body size by bytes’s length. timingInfo.encodedBodySize += bytes.byteLength - // 4. See pullAlgorithm... + // 4. See pullAlgorithm... - return this.decoder.write(bytes) + return this.body.push(bytes) }, onComplete () { - this.decoder.end() + this.body.push(null) }, onError (error) { - this.decoder?.destroy(error) + this.body?.destroy(error) this.context.terminate({ reason: error })