Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: unify close/destroy #1278

Merged
merged 4 commits into from Mar 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
128 changes: 30 additions & 98 deletions lib/agent.js
@@ -1,20 +1,14 @@
'use strict'

const {
ClientClosedError,
InvalidArgumentError,
ClientDestroyedError
} = require('./core/errors')
const { kClients, kRunning } = require('./core/symbols')
const Dispatcher = require('./dispatcher')
const { InvalidArgumentError } = require('./core/errors')
const { kClients, kRunning, kClose, kDestroy, kDispatch } = require('./core/symbols')
const DispatcherBase = require('./dispatcher-base')
const Pool = require('./pool')
const Client = require('./client')
const util = require('./core/util')
const RedirectHandler = require('./handler/redirect')
const { WeakRef, FinalizationRegistry } = require('./compat/dispatcher-weakref')()

const kDestroyed = Symbol('destroyed')
const kClosed = Symbol('closed')
const kOnConnect = Symbol('onConnect')
const kOnDisconnect = Symbol('onDisconnect')
const kOnConnectionError = Symbol('onConnectionError')
Expand All @@ -30,7 +24,7 @@ function defaultFactory (origin, opts) {
: new Pool(origin, opts)
}

class Agent extends Dispatcher {
class Agent extends DispatcherBase {
constructor ({ factory = defaultFactory, maxRedirections = 0, connect, ...options } = {}) {
super()

Expand Down Expand Up @@ -60,8 +54,6 @@ class Agent extends Dispatcher {
this[kClients].delete(key)
}
})
this[kClosed] = false
this[kDestroyed] = false

const agent = this

Expand Down Expand Up @@ -94,76 +86,38 @@ class Agent extends Dispatcher {
return ret
}

dispatch (opts, handler) {
if (!handler || typeof handler !== 'object') {
throw new InvalidArgumentError('handler must be an object.')
[kDispatch] (opts, handler) {
let key
if (opts.origin && (typeof opts.origin === 'string' || opts.origin instanceof URL)) {
key = String(opts.origin)
} else {
throw new InvalidArgumentError('opts.origin must be a non-empty string or URL.')
}

try {
if (!opts || typeof opts !== 'object') {
throw new InvalidArgumentError('opts must be an object.')
}

let key
if (opts.origin && (typeof opts.origin === 'string' || opts.origin instanceof URL)) {
key = String(opts.origin)
} else {
throw new InvalidArgumentError('opts.origin must be a non-empty string or URL.')
}

if (this[kDestroyed]) {
throw new ClientDestroyedError()
}

if (this[kClosed]) {
throw new ClientClosedError()
}
const ref = this[kClients].get(key)

const ref = this[kClients].get(key)

let dispatcher = ref ? ref.deref() : null
if (!dispatcher) {
dispatcher = this[kFactory](opts.origin, this[kOptions])
.on('drain', this[kOnDrain])
.on('connect', this[kOnConnect])
.on('disconnect', this[kOnDisconnect])
.on('connectionError', this[kOnConnectionError])

this[kClients].set(key, new WeakRef(dispatcher))
this[kFinalizer].register(dispatcher, key)
}

const { maxRedirections = this[kMaxRedirections] } = opts
if (maxRedirections != null && maxRedirections !== 0) {
opts = { ...opts, maxRedirections: 0 } // Stop sub dispatcher from also redirecting.
handler = new RedirectHandler(this, maxRedirections, opts, handler)
}

return dispatcher.dispatch(opts, handler)
} catch (err) {
if (typeof handler.onError !== 'function') {
throw new InvalidArgumentError('invalid onError method')
}
let dispatcher = ref ? ref.deref() : null
if (!dispatcher) {
dispatcher = this[kFactory](opts.origin, this[kOptions])
.on('drain', this[kOnDrain])
.on('connect', this[kOnConnect])
.on('disconnect', this[kOnDisconnect])
.on('connectionError', this[kOnConnectionError])

handler.onError(err)
this[kClients].set(key, new WeakRef(dispatcher))
this[kFinalizer].register(dispatcher, key)
}
}

get closed () {
return this[kClosed]
}

get destroyed () {
return this[kDestroyed]
}

close (callback) {
if (callback != null && typeof callback !== 'function') {
throw new InvalidArgumentError('callback must be a function')
const { maxRedirections = this[kMaxRedirections] } = opts
if (maxRedirections != null && maxRedirections !== 0) {
opts = { ...opts, maxRedirections: 0 } // Stop sub dispatcher from also redirecting.
handler = new RedirectHandler(this, maxRedirections, opts, handler)
}

this[kClosed] = true
return dispatcher.dispatch(opts, handler)
}

async [kClose] () {
const closePromises = []
for (const ref of this[kClients].values()) {
const client = ref.deref()
Expand All @@ -173,27 +127,10 @@ class Agent extends Dispatcher {
}
}

if (!callback) {
return Promise.all(closePromises)
}

// Should never error.
Promise.all(closePromises).then(() => process.nextTick(callback))
await Promise.all(closePromises)
}

destroy (err, callback) {
if (typeof err === 'function') {
callback = err
err = null
}

if (callback != null && typeof callback !== 'function') {
throw new InvalidArgumentError('callback must be a function')
}

this[kClosed] = true
this[kDestroyed] = true

async [kDestroy] (err) {
const destroyPromises = []
for (const ref of this[kClients].values()) {
const client = ref.deref()
Expand All @@ -203,12 +140,7 @@ class Agent extends Dispatcher {
}
}

if (!callback) {
return Promise.all(destroyPromises)
}

// Should never error.
Promise.all(destroyPromises).then(() => process.nextTick(callback))
await Promise.all(destroyPromises)
}
}

Expand Down