Skip to content

Commit

Permalink
fix: errored stream is disturbed (nodejs#1134)
Browse files Browse the repository at this point in the history
* fix: errored stream is disturbed

* fixup

* fixup

* fixup

* fixup

* fixup

* fixup

* fixup

* fixup

* fixup

* fixup

* fixup

* fixup

* fixup

* fixup

* fixup

* fixup

* fixup

* fixup
  • Loading branch information
ronag authored and crysmags committed Feb 27, 2024
1 parent 1803143 commit 921d7fc
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 20 deletions.
24 changes: 17 additions & 7 deletions lib/core/util.js
Expand Up @@ -150,7 +150,7 @@ function isDestroyed (stream) {
return !stream || !!(stream.destroyed || stream[kDestroyed])
}

function isAborted (stream) {
function isReadableAborted (stream) {
const state = stream && stream._readableState
return isDestroyed(stream) && state && !state.endEmitted
}
Expand Down Expand Up @@ -244,15 +244,24 @@ function validateHandler (handler, method, upgrade) {
// A body is disturbed if it has been read from and it cannot
// be re-used without losing state or data.
function isDisturbed (body) {
const state = body && body._readableState
return !!(body && (
(stream.isDisturbed && stream.isDisturbed(body)) ||
body[kBodyUsed] ||
body.readableDidRead || (state && state.dataEmitted) ||
isAborted(body)
stream.isDisturbed
? stream.isDisturbed(body) || body[kBodyUsed] // TODO (fix): Why is body[kBodyUsed] needed?
: body[kBodyUsed] ||
body.readableDidRead ||
(body._readableState && body._readableState.dataEmitted) ||
isReadableAborted(body)
))
}

function isErrored (body) {
return !!(body && (
stream.isErrored
? stream.isErrored(body)
: /state: 'errored'/.test(nodeUtil.inspect(body)
)))
}

function getSocketInfo (socket) {
return {
localAddress: socket.localAddress,
Expand Down Expand Up @@ -310,8 +319,9 @@ module.exports = {
kEnumerableProperty,
nop,
isDisturbed,
isErrored,
toUSVString: nodeUtil.toUSVString || ((val) => `${val}`),
isAborted,
isReadableAborted,
isBlobLike,
parseOrigin,
parseURL,
Expand Down
9 changes: 3 additions & 6 deletions lib/fetch/body.js
Expand Up @@ -3,12 +3,13 @@
const util = require('../core/util')
const { ReadableStreamFrom, toUSVString, isBlobLike } = require('./util')
const { FormData } = require('./formdata')
const { kState, kError } = require('./symbols')
const { kState } = require('./symbols')
const { Blob } = require('buffer')
const { kBodyUsed } = require('../core/symbols')
const assert = require('assert')
const nodeUtil = require('util')
const { NotSupportedError } = require('../core/errors')
const { isErrored } = require('../core/util')

let ReadableStream

Expand Down Expand Up @@ -187,7 +188,7 @@ function extractBody (object, keepalive = false) {
// Whenever one or more bytes are available and stream is not errored,
// enqueue a Uint8Array wrapping an ArrayBuffer containing the available
// bytes into stream.
if (!/state: 'errored'/.test(nodeUtil.inspect(stream))) {
if (!isErrored(stream)) {
controller.enqueue(new Uint8Array(value))
}
}
Expand Down Expand Up @@ -268,10 +269,6 @@ const methods = {
if (this[kState].body) {
const stream = this[kState].body.stream

if (stream[kError]) {
throw stream[kError]
}

if (util.isDisturbed(stream)) {
throw new TypeError('disturbed')
}
Expand Down
8 changes: 4 additions & 4 deletions lib/fetch/index.js
Expand Up @@ -31,7 +31,7 @@ const {
determineRequestsReferrer,
coarsenedSharedCurrentTime
} = require('./util')
const { kState, kHeaders, kGuard, kRealm, kError } = require('./symbols')
const { kState, kHeaders, kGuard, kRealm } = require('./symbols')
const { AbortError } = require('../core/errors')
const assert = require('assert')
const { safelyExtractBody, cancelBody } = require('./body')
Expand All @@ -45,6 +45,7 @@ const {
const { kHeadersList } = require('../core/symbols')
const EE = require('events')
const { PassThrough, pipeline, compose } = require('stream')
const { isErrored } = require('../core/util')

let ReadableStream

Expand Down Expand Up @@ -1531,7 +1532,6 @@ function httpNetworkFetch (
await pullAlgorithm(controller)
},
async cancel (reason) {
stream[kError] = reason
await cancelAlgorithm(reason)
}
},
Expand Down Expand Up @@ -1742,8 +1742,8 @@ function httpNetworkFetch (
controller.enqueue(new Uint8Array(bytes))

// 8. If stream is errored, then terminate the ongoing fetch.
if (stream[kError]) {
this.context.terminate({ reason: stream[kError] })
if (isErrored(stream)) {
this.context.terminate()
return
}

Expand Down
3 changes: 1 addition & 2 deletions lib/fetch/symbols.js
Expand Up @@ -6,6 +6,5 @@ module.exports = {
kSignal: Symbol('signal'),
kState: Symbol('state'),
kGuard: Symbol('guard'),
kRealm: Symbol('realm'),
kError: Symbol('error')
kRealm: Symbol('realm')
}
2 changes: 1 addition & 1 deletion test/node-fetch/main.js
Expand Up @@ -882,7 +882,7 @@ describe('node-fetch', () => {
return expect(res.text())
.to.eventually.be.rejected
.and.be.an.instanceof(Error)
.and.have.property('name', 'AbortError')
.and.have.property('name', 'TypeError')
})
})
})
Expand Down

0 comments on commit 921d7fc

Please sign in to comment.