From 921d7fc186eee883f9e7e6bc8bc32802fa9c4ad1 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Dec 2021 19:39:46 +0100 Subject: [PATCH] fix: errored stream is disturbed (#1134) * fix: errored stream is disturbed * fixup * fixup * fixup * fixup * fixup * fixup * fixup * fixup * fixup * fixup * fixup * fixup * fixup * fixup * fixup * fixup * fixup * fixup --- lib/core/util.js | 24 +++++++++++++++++------- lib/fetch/body.js | 9 +++------ lib/fetch/index.js | 8 ++++---- lib/fetch/symbols.js | 3 +-- test/node-fetch/main.js | 2 +- 5 files changed, 26 insertions(+), 20 deletions(-) diff --git a/lib/core/util.js b/lib/core/util.js index 63f13cbb406..69c7d44434c 100644 --- a/lib/core/util.js +++ b/lib/core/util.js @@ -150,7 +150,7 @@ function isDestroyed (stream) { return !stream || !!(stream.destroyed || stream[kDestroyed]) } -function isAborted (stream) { +function isReadableAborted (stream) { const state = stream && stream._readableState return isDestroyed(stream) && state && !state.endEmitted } @@ -244,15 +244,24 @@ function validateHandler (handler, method, upgrade) { // A body is disturbed if it has been read from and it cannot // be re-used without losing state or data. function isDisturbed (body) { - const state = body && body._readableState return !!(body && ( - (stream.isDisturbed && stream.isDisturbed(body)) || - body[kBodyUsed] || - body.readableDidRead || (state && state.dataEmitted) || - isAborted(body) + stream.isDisturbed + ? stream.isDisturbed(body) || body[kBodyUsed] // TODO (fix): Why is body[kBodyUsed] needed? + : body[kBodyUsed] || + body.readableDidRead || + (body._readableState && body._readableState.dataEmitted) || + isReadableAborted(body) )) } +function isErrored (body) { + return !!(body && ( + stream.isErrored + ? stream.isErrored(body) + : /state: 'errored'/.test(nodeUtil.inspect(body) + ))) +} + function getSocketInfo (socket) { return { localAddress: socket.localAddress, @@ -310,8 +319,9 @@ module.exports = { kEnumerableProperty, nop, isDisturbed, + isErrored, toUSVString: nodeUtil.toUSVString || ((val) => `${val}`), - isAborted, + isReadableAborted, isBlobLike, parseOrigin, parseURL, diff --git a/lib/fetch/body.js b/lib/fetch/body.js index 0cf88af74aa..9a858874958 100644 --- a/lib/fetch/body.js +++ b/lib/fetch/body.js @@ -3,12 +3,13 @@ const util = require('../core/util') const { ReadableStreamFrom, toUSVString, isBlobLike } = require('./util') const { FormData } = require('./formdata') -const { kState, kError } = require('./symbols') +const { kState } = require('./symbols') const { Blob } = require('buffer') const { kBodyUsed } = require('../core/symbols') const assert = require('assert') const nodeUtil = require('util') const { NotSupportedError } = require('../core/errors') +const { isErrored } = require('../core/util') let ReadableStream @@ -187,7 +188,7 @@ function extractBody (object, keepalive = false) { // Whenever one or more bytes are available and stream is not errored, // enqueue a Uint8Array wrapping an ArrayBuffer containing the available // bytes into stream. - if (!/state: 'errored'/.test(nodeUtil.inspect(stream))) { + if (!isErrored(stream)) { controller.enqueue(new Uint8Array(value)) } } @@ -268,10 +269,6 @@ const methods = { if (this[kState].body) { const stream = this[kState].body.stream - if (stream[kError]) { - throw stream[kError] - } - if (util.isDisturbed(stream)) { throw new TypeError('disturbed') } diff --git a/lib/fetch/index.js b/lib/fetch/index.js index 476264013ad..1a67fbc616d 100644 --- a/lib/fetch/index.js +++ b/lib/fetch/index.js @@ -31,7 +31,7 @@ const { determineRequestsReferrer, coarsenedSharedCurrentTime } = require('./util') -const { kState, kHeaders, kGuard, kRealm, kError } = require('./symbols') +const { kState, kHeaders, kGuard, kRealm } = require('./symbols') const { AbortError } = require('../core/errors') const assert = require('assert') const { safelyExtractBody, cancelBody } = require('./body') @@ -45,6 +45,7 @@ const { const { kHeadersList } = require('../core/symbols') const EE = require('events') const { PassThrough, pipeline, compose } = require('stream') +const { isErrored } = require('../core/util') let ReadableStream @@ -1531,7 +1532,6 @@ function httpNetworkFetch ( await pullAlgorithm(controller) }, async cancel (reason) { - stream[kError] = reason await cancelAlgorithm(reason) } }, @@ -1742,8 +1742,8 @@ function httpNetworkFetch ( controller.enqueue(new Uint8Array(bytes)) // 8. If stream is errored, then terminate the ongoing fetch. - if (stream[kError]) { - this.context.terminate({ reason: stream[kError] }) + if (isErrored(stream)) { + this.context.terminate() return } diff --git a/lib/fetch/symbols.js b/lib/fetch/symbols.js index 823a5d3c0c6..0b947d55bad 100644 --- a/lib/fetch/symbols.js +++ b/lib/fetch/symbols.js @@ -6,6 +6,5 @@ module.exports = { kSignal: Symbol('signal'), kState: Symbol('state'), kGuard: Symbol('guard'), - kRealm: Symbol('realm'), - kError: Symbol('error') + kRealm: Symbol('realm') } diff --git a/test/node-fetch/main.js b/test/node-fetch/main.js index fbae3ccf935..2bcaa59edb0 100644 --- a/test/node-fetch/main.js +++ b/test/node-fetch/main.js @@ -882,7 +882,7 @@ describe('node-fetch', () => { return expect(res.text()) .to.eventually.be.rejected .and.be.an.instanceof(Error) - .and.have.property('name', 'AbortError') + .and.have.property('name', 'TypeError') }) }) })