Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
fix: stalling subscription on (node) http-client when daemon is stopp…
Browse files Browse the repository at this point in the history
…ed (#3468)

This change fixes #3465 by upgrading to a temporary fork of node-fetch with
node-fetch/node-fetch#1172 applied.

Co-authored-by: achingbrain <alex@achingbrain.net>
  • Loading branch information
Gozala and achingbrain committed Jun 1, 2021
1 parent e294067 commit 0266abf
Show file tree
Hide file tree
Showing 11 changed files with 120 additions and 20 deletions.
2 changes: 1 addition & 1 deletion examples/browser-ipns-publish/package.json
Expand Up @@ -16,7 +16,7 @@
"human-crypto-keys": "^0.1.4",
"ipfs": "^0.55.2",
"ipfs-http-client": "^50.1.0",
"ipfs-utils": "^7.0.0",
"ipfs-utils": "^8.1.2",
"ipns": "^0.11.0",
"it-last": "^1.0.4",
"p-retry": "^4.2.0",
Expand Down
2 changes: 1 addition & 1 deletion examples/custom-libp2p/package.json
Expand Up @@ -11,7 +11,7 @@
"license": "MIT",
"dependencies": {
"ipfs": "^0.55.2",
"libp2p": "^0.31.5",
"libp2p": "^0.31.6",
"libp2p-bootstrap": "^0.12.3",
"libp2p-kad-dht": "^0.22.0",
"libp2p-mdns": "^0.16.0",
Expand Down
2 changes: 1 addition & 1 deletion packages/interface-ipfs-core/package.json
Expand Up @@ -47,7 +47,7 @@
"err-code": "^3.0.1",
"ipfs-unixfs": "^4.0.3",
"ipfs-unixfs-importer": "^7.0.3",
"ipfs-utils": "^7.0.0",
"ipfs-utils": "^8.1.2",
"ipld-block": "^0.11.0",
"ipld-dag-cbor": "^1.0.0",
"ipld-dag-pb": "^0.22.1",
Expand Down
2 changes: 1 addition & 1 deletion packages/ipfs-cli/package.json
Expand Up @@ -43,7 +43,7 @@
"ipfs-daemon": "^0.7.0",
"ipfs-http-client": "^50.1.0",
"ipfs-repo": "^9.1.6",
"ipfs-utils": "^7.0.0",
"ipfs-utils": "^8.1.2",
"ipld-dag-cbor": "^1.0.0",
"ipld-dag-pb": "^0.22.1",
"it-all": "^1.0.4",
Expand Down
2 changes: 1 addition & 1 deletion packages/ipfs-core-utils/package.json
Expand Up @@ -48,7 +48,7 @@
"err-code": "^3.0.1",
"ipfs-core-types": "^0.5.0",
"ipfs-unixfs": "^4.0.3",
"ipfs-utils": "^7.0.0",
"ipfs-utils": "^8.1.2",
"it-all": "^1.0.4",
"it-map": "^1.0.4",
"it-peekable": "^1.0.1",
Expand Down
6 changes: 3 additions & 3 deletions packages/ipfs-core/package.json
Expand Up @@ -78,7 +78,7 @@
"ipfs-unixfs": "^4.0.3",
"ipfs-unixfs-exporter": "^5.0.3",
"ipfs-unixfs-importer": "^7.0.3",
"ipfs-utils": "^7.0.0",
"ipfs-utils": "^8.1.2",
"ipld": "^0.30.0",
"ipld-block": "^0.11.0",
"ipld-dag-cbor": "^1.0.0",
Expand All @@ -94,11 +94,11 @@
"it-map": "^1.0.4",
"it-pipe": "^1.1.0",
"just-safe-set": "^2.2.1",
"libp2p": "^0.31.5",
"libp2p": "^0.31.6",
"libp2p-bootstrap": "^0.12.3",
"libp2p-crypto": "^0.19.3",
"libp2p-floodsub": "^0.25.1",
"libp2p-gossipsub": "^0.9.0",
"libp2p-gossipsub": "^0.9.2",
"libp2p-kad-dht": "^0.22.0",
"libp2p-mdns": "^0.16.0",
"libp2p-mplex": "^0.10.2",
Expand Down
6 changes: 3 additions & 3 deletions packages/ipfs-daemon/package.json
Expand Up @@ -38,17 +38,17 @@
"ipfs-http-client": "^50.1.0",
"ipfs-http-gateway": "^0.4.1",
"ipfs-http-server": "^0.5.0",
"ipfs-utils": "^7.0.0",
"ipfs-utils": "^8.1.2",
"just-safe-set": "^2.2.1",
"libp2p": "^0.31.5",
"libp2p": "^0.31.6",
"libp2p-delegated-content-routing": "^0.10.0",
"libp2p-delegated-peer-routing": "^0.9.0",
"libp2p-webrtc-star": "^0.22.2",
"multiaddr": "^9.0.1"
},
"devDependencies": {
"aegir": "^33.0.0",
"node-fetch": "^2.6.1",
"node-fetch": "npm:@achingbrain/node-fetch@^2.6.4",
"ws": "^7.3.1"
},
"optionalDependencies": {
Expand Down
2 changes: 1 addition & 1 deletion packages/ipfs-http-client/package.json
Expand Up @@ -52,7 +52,7 @@
"ipfs-core-types": "^0.5.0",
"ipfs-core-utils": "^0.8.1",
"ipfs-unixfs": "^4.0.3",
"ipfs-utils": "^7.0.0",
"ipfs-utils": "^8.1.2",
"ipld-block": "^0.11.0",
"ipld-dag-cbor": "^1.0.0",
"ipld-dag-pb": "^0.22.1",
Expand Down
30 changes: 23 additions & 7 deletions packages/ipfs-http-client/src/pubsub/subscribe.js
Expand Up @@ -62,7 +62,7 @@ module.exports = configure((api, options) => {
return
}

readMessages(response.ndjson(), {
readMessages(response, {
onMessage: handler,
onEnd: () => subsTracker.unsubscribe(topic, handler),
onError: options.onError
Expand All @@ -78,17 +78,17 @@ module.exports = configure((api, options) => {
})

/**
* @param {*} msgStream
* @param {import('ipfs-utils/src/types').ExtendedResponse} response
* @param {object} options
* @param {(message: Message) => void} options.onMessage
* @param {() => void} options.onEnd
* @param {ErrorHandlerFn} [options.onError]
*/
async function readMessages (msgStream, { onMessage, onEnd, onError }) {
async function readMessages (response, { onMessage, onEnd, onError }) {
onError = onError || log

try {
for await (const msg of msgStream) {
for await (const msg of response.ndjson()) {
try {
if (!msg.from) {
continue
Expand All @@ -106,12 +106,28 @@ async function readMessages (msgStream, { onMessage, onEnd, onError }) {
}
}
} catch (err) {
// FIXME: In testing with Chrome, err.type is undefined (should not be!)
// Temporarily use the name property instead.
if (err.type !== 'aborted' && err.name !== 'AbortError') {
if (!isAbortError(err)) {
onError(err, true) // Fatal
}
} finally {
onEnd()
}
}

/**
* @param {Error & {type?:string}} error
* @returns {boolean}
*/
const isAbortError = error => {
switch (error.type) {
case 'aborted':
return true
// It is `abort` in Electron instead of `aborted`
case 'abort':
return true
default:
// FIXME: In testing with Chrome, err.type is undefined (should not be!)
// Temporarily use the name property instead.
return error.name === 'AbortError'
}
}
84 changes: 84 additions & 0 deletions packages/ipfs-http-client/test/pubsub.spec.js
@@ -0,0 +1,84 @@
/* eslint-env mocha */
'use strict'

const { expect } = require('aegir/utils/chai')
const { AbortController } = require('native-abort-controller')

const f = require('./utils/factory')()

describe('.pubsub', function () {
this.timeout(20 * 1000)
describe('.subscribe', () => {
let ipfs
let ctl

beforeEach(async function () {
this.timeout(30 * 1000) // slow CI

ctl = await await f.spawn({
args: '--enable-pubsub-experiment'
})

ipfs = ctl.api
})

afterEach(() => f.clean())

it('.onError when connection is closed', async () => {
const topic = 'gossipboom'
let messageCount = 0
let onError
const error = new Promise(resolve => { onError = resolve })

await ipfs.pubsub.subscribe(topic, message => {
messageCount++

if (messageCount === 2) {
// Stop the daemon
ctl.stop().catch()
}
}, {
onError
})

await ipfs.pubsub.publish(topic, 'hello')
await ipfs.pubsub.publish(topic, 'bye')

await expect(error).to.eventually.be.fulfilled().and.to.be.instanceOf(Error)
})

it('does not call onError when aborted', async () => {
const controller = new AbortController()
const topic = 'gossipabort'
const messages = []
let onError
let onReceived

const received = new Promise(resolve => { onReceived = resolve })
const error = new Promise(resolve => { onError = resolve })

await ipfs.pubsub.subscribe(topic, message => {
messages.push(message)
if (messages.length === 2) {
onReceived()
}
}, {
onError,
signal: controller.signal
})

await ipfs.pubsub.publish(topic, 'hello')
await ipfs.pubsub.publish(topic, 'bye')

await received
controller.abort()

// Stop the daemon
await ctl.stop()
// Just to make sure no error is caused by above line
setTimeout(onError, 200, 'aborted')

await expect(error).to.eventually.be.fulfilled().and.to.equal('aborted')
})
})
})
2 changes: 1 addition & 1 deletion packages/ipfs/package.json
Expand Up @@ -57,7 +57,7 @@
"ipfs-core-types": "^0.5.0",
"ipfs-http-client": "^50.1.0",
"ipfs-interop": "^5.0.2",
"ipfs-utils": "^7.0.0",
"ipfs-utils": "^8.1.2",
"ipfsd-ctl": "^8.0.1",
"iso-url": "^1.0.0",
"libp2p-webrtc-star": "^0.22.2",
Expand Down

0 comments on commit 0266abf

Please sign in to comment.