Skip to content

Commit

Permalink
refactor: unify close/destroy (nodejs#1278)
Browse files Browse the repository at this point in the history
* refactor: unify close/destroy

* fixup

* fixup

* fixup
  • Loading branch information
ronag authored and metcoder95 committed Dec 26, 2022
1 parent 8f9249f commit 2770fb9
Show file tree
Hide file tree
Showing 10 changed files with 291 additions and 376 deletions.
128 changes: 30 additions & 98 deletions lib/agent.js
@@ -1,20 +1,14 @@
'use strict'

const {
ClientClosedError,
InvalidArgumentError,
ClientDestroyedError
} = require('./core/errors')
const { kClients, kRunning } = require('./core/symbols')
const Dispatcher = require('./dispatcher')
const { InvalidArgumentError } = require('./core/errors')
const { kClients, kRunning, kClose, kDestroy, kDispatch } = require('./core/symbols')
const DispatcherBase = require('./dispatcher-base')
const Pool = require('./pool')
const Client = require('./client')
const util = require('./core/util')
const RedirectHandler = require('./handler/redirect')
const { WeakRef, FinalizationRegistry } = require('./compat/dispatcher-weakref')()

const kDestroyed = Symbol('destroyed')
const kClosed = Symbol('closed')
const kOnConnect = Symbol('onConnect')
const kOnDisconnect = Symbol('onDisconnect')
const kOnConnectionError = Symbol('onConnectionError')
Expand All @@ -30,7 +24,7 @@ function defaultFactory (origin, opts) {
: new Pool(origin, opts)
}

class Agent extends Dispatcher {
class Agent extends DispatcherBase {
constructor ({ factory = defaultFactory, maxRedirections = 0, connect, ...options } = {}) {
super()

Expand Down Expand Up @@ -60,8 +54,6 @@ class Agent extends Dispatcher {
this[kClients].delete(key)
}
})
this[kClosed] = false
this[kDestroyed] = false

const agent = this

Expand Down Expand Up @@ -94,76 +86,38 @@ class Agent extends Dispatcher {
return ret
}

dispatch (opts, handler) {
if (!handler || typeof handler !== 'object') {
throw new InvalidArgumentError('handler must be an object.')
[kDispatch] (opts, handler) {
let key
if (opts.origin && (typeof opts.origin === 'string' || opts.origin instanceof URL)) {
key = String(opts.origin)
} else {
throw new InvalidArgumentError('opts.origin must be a non-empty string or URL.')
}

try {
if (!opts || typeof opts !== 'object') {
throw new InvalidArgumentError('opts must be an object.')
}

let key
if (opts.origin && (typeof opts.origin === 'string' || opts.origin instanceof URL)) {
key = String(opts.origin)
} else {
throw new InvalidArgumentError('opts.origin must be a non-empty string or URL.')
}

if (this[kDestroyed]) {
throw new ClientDestroyedError()
}

if (this[kClosed]) {
throw new ClientClosedError()
}
const ref = this[kClients].get(key)

const ref = this[kClients].get(key)

let dispatcher = ref ? ref.deref() : null
if (!dispatcher) {
dispatcher = this[kFactory](opts.origin, this[kOptions])
.on('drain', this[kOnDrain])
.on('connect', this[kOnConnect])
.on('disconnect', this[kOnDisconnect])
.on('connectionError', this[kOnConnectionError])

this[kClients].set(key, new WeakRef(dispatcher))
this[kFinalizer].register(dispatcher, key)
}

const { maxRedirections = this[kMaxRedirections] } = opts
if (maxRedirections != null && maxRedirections !== 0) {
opts = { ...opts, maxRedirections: 0 } // Stop sub dispatcher from also redirecting.
handler = new RedirectHandler(this, maxRedirections, opts, handler)
}

return dispatcher.dispatch(opts, handler)
} catch (err) {
if (typeof handler.onError !== 'function') {
throw new InvalidArgumentError('invalid onError method')
}
let dispatcher = ref ? ref.deref() : null
if (!dispatcher) {
dispatcher = this[kFactory](opts.origin, this[kOptions])
.on('drain', this[kOnDrain])
.on('connect', this[kOnConnect])
.on('disconnect', this[kOnDisconnect])
.on('connectionError', this[kOnConnectionError])

handler.onError(err)
this[kClients].set(key, new WeakRef(dispatcher))
this[kFinalizer].register(dispatcher, key)
}
}

get closed () {
return this[kClosed]
}

get destroyed () {
return this[kDestroyed]
}

close (callback) {
if (callback != null && typeof callback !== 'function') {
throw new InvalidArgumentError('callback must be a function')
const { maxRedirections = this[kMaxRedirections] } = opts
if (maxRedirections != null && maxRedirections !== 0) {
opts = { ...opts, maxRedirections: 0 } // Stop sub dispatcher from also redirecting.
handler = new RedirectHandler(this, maxRedirections, opts, handler)
}

this[kClosed] = true
return dispatcher.dispatch(opts, handler)
}

async [kClose] () {
const closePromises = []
for (const ref of this[kClients].values()) {
const client = ref.deref()
Expand All @@ -173,27 +127,10 @@ class Agent extends Dispatcher {
}
}

if (!callback) {
return Promise.all(closePromises)
}

// Should never error.
Promise.all(closePromises).then(() => process.nextTick(callback))
await Promise.all(closePromises)
}

destroy (err, callback) {
if (typeof err === 'function') {
callback = err
err = null
}

if (callback != null && typeof callback !== 'function') {
throw new InvalidArgumentError('callback must be a function')
}

this[kClosed] = true
this[kDestroyed] = true

async [kDestroy] (err) {
const destroyPromises = []
for (const ref of this[kClients].values()) {
const client = ref.deref()
Expand All @@ -203,12 +140,7 @@ class Agent extends Dispatcher {
}
}

if (!callback) {
return Promise.all(destroyPromises)
}

// Should never error.
Promise.all(destroyPromises).then(() => process.nextTick(callback))
await Promise.all(destroyPromises)
}
}

Expand Down

0 comments on commit 2770fb9

Please sign in to comment.