diff --git a/lib/agent.js b/lib/agent.js index 30ac4ee1181..47aa2365e61 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -1,20 +1,14 @@ 'use strict' -const { - ClientClosedError, - InvalidArgumentError, - ClientDestroyedError -} = require('./core/errors') -const { kClients, kRunning } = require('./core/symbols') -const Dispatcher = require('./dispatcher') +const { InvalidArgumentError } = require('./core/errors') +const { kClients, kRunning, kClose, kDestroy, kDispatch } = require('./core/symbols') +const DispatcherBase = require('./dispatcher-base') const Pool = require('./pool') const Client = require('./client') const util = require('./core/util') const RedirectHandler = require('./handler/redirect') const { WeakRef, FinalizationRegistry } = require('./compat/dispatcher-weakref')() -const kDestroyed = Symbol('destroyed') -const kClosed = Symbol('closed') const kOnConnect = Symbol('onConnect') const kOnDisconnect = Symbol('onDisconnect') const kOnConnectionError = Symbol('onConnectionError') @@ -30,7 +24,7 @@ function defaultFactory (origin, opts) { : new Pool(origin, opts) } -class Agent extends Dispatcher { +class Agent extends DispatcherBase { constructor ({ factory = defaultFactory, maxRedirections = 0, connect, ...options } = {}) { super() @@ -60,8 +54,6 @@ class Agent extends Dispatcher { this[kClients].delete(key) } }) - this[kClosed] = false - this[kDestroyed] = false const agent = this @@ -94,76 +86,38 @@ class Agent extends Dispatcher { return ret } - dispatch (opts, handler) { - if (!handler || typeof handler !== 'object') { - throw new InvalidArgumentError('handler must be an object.') + [kDispatch] (opts, handler) { + let key + if (opts.origin && (typeof opts.origin === 'string' || opts.origin instanceof URL)) { + key = String(opts.origin) + } else { + throw new InvalidArgumentError('opts.origin must be a non-empty string or URL.') } - try { - if (!opts || typeof opts !== 'object') { - throw new InvalidArgumentError('opts must be an object.') - } - - let key - if (opts.origin && (typeof opts.origin === 'string' || opts.origin instanceof URL)) { - key = String(opts.origin) - } else { - throw new InvalidArgumentError('opts.origin must be a non-empty string or URL.') - } - - if (this[kDestroyed]) { - throw new ClientDestroyedError() - } - - if (this[kClosed]) { - throw new ClientClosedError() - } + const ref = this[kClients].get(key) - const ref = this[kClients].get(key) - - let dispatcher = ref ? ref.deref() : null - if (!dispatcher) { - dispatcher = this[kFactory](opts.origin, this[kOptions]) - .on('drain', this[kOnDrain]) - .on('connect', this[kOnConnect]) - .on('disconnect', this[kOnDisconnect]) - .on('connectionError', this[kOnConnectionError]) - - this[kClients].set(key, new WeakRef(dispatcher)) - this[kFinalizer].register(dispatcher, key) - } - - const { maxRedirections = this[kMaxRedirections] } = opts - if (maxRedirections != null && maxRedirections !== 0) { - opts = { ...opts, maxRedirections: 0 } // Stop sub dispatcher from also redirecting. - handler = new RedirectHandler(this, maxRedirections, opts, handler) - } - - return dispatcher.dispatch(opts, handler) - } catch (err) { - if (typeof handler.onError !== 'function') { - throw new InvalidArgumentError('invalid onError method') - } + let dispatcher = ref ? ref.deref() : null + if (!dispatcher) { + dispatcher = this[kFactory](opts.origin, this[kOptions]) + .on('drain', this[kOnDrain]) + .on('connect', this[kOnConnect]) + .on('disconnect', this[kOnDisconnect]) + .on('connectionError', this[kOnConnectionError]) - handler.onError(err) + this[kClients].set(key, new WeakRef(dispatcher)) + this[kFinalizer].register(dispatcher, key) } - } - - get closed () { - return this[kClosed] - } - - get destroyed () { - return this[kDestroyed] - } - close (callback) { - if (callback != null && typeof callback !== 'function') { - throw new InvalidArgumentError('callback must be a function') + const { maxRedirections = this[kMaxRedirections] } = opts + if (maxRedirections != null && maxRedirections !== 0) { + opts = { ...opts, maxRedirections: 0 } // Stop sub dispatcher from also redirecting. + handler = new RedirectHandler(this, maxRedirections, opts, handler) } - this[kClosed] = true + return dispatcher.dispatch(opts, handler) + } + async [kClose] () { const closePromises = [] for (const ref of this[kClients].values()) { const client = ref.deref() @@ -173,27 +127,10 @@ class Agent extends Dispatcher { } } - if (!callback) { - return Promise.all(closePromises) - } - - // Should never error. - Promise.all(closePromises).then(() => process.nextTick(callback)) + await Promise.all(closePromises) } - destroy (err, callback) { - if (typeof err === 'function') { - callback = err - err = null - } - - if (callback != null && typeof callback !== 'function') { - throw new InvalidArgumentError('callback must be a function') - } - - this[kClosed] = true - this[kDestroyed] = true - + async [kDestroy] (err) { const destroyPromises = [] for (const ref of this[kClients].values()) { const client = ref.deref() @@ -203,12 +140,7 @@ class Agent extends Dispatcher { } } - if (!callback) { - return Promise.all(destroyPromises) - } - - // Should never error. - Promise.all(destroyPromises).then(() => process.nextTick(callback)) + await Promise.all(destroyPromises) } } diff --git a/lib/client.js b/lib/client.js index 55d9afabf95..d3d4cfc705d 100644 --- a/lib/client.js +++ b/lib/client.js @@ -6,7 +6,7 @@ const assert = require('assert') const net = require('net') const util = require('./core/util') const Request = require('./core/request') -const Dispatcher = require('./dispatcher') +const DispatcherBase = require('./dispatcher-base') const RedirectHandler = require('./handler/redirect') const { RequestContentLengthMismatchError, @@ -16,8 +16,6 @@ const { RequestAbortedError, HeadersTimeoutError, HeadersOverflowError, - ClientDestroyedError, - ClientClosedError, SocketError, InformationalError, BodyTimeoutError, @@ -45,12 +43,9 @@ const { kNoRef, kKeepAliveDefaultTimeout, kHostHeader, - kClosed, - kDestroyed, kPendingIdx, kRunningIdx, kError, - kOnDestroyed, kPipelining, kSocket, kKeepAliveTimeoutValue, @@ -63,9 +58,14 @@ const { kConnector, kMaxRedirections, kMaxRequests, - kCounter + kCounter, + kClose, + kDestroy, + kDispatch } = require('./core/symbols') +const kClosedResolve = Symbol('kClosedResolve') + const channels = {} try { @@ -81,7 +81,7 @@ try { channels.connected = { hasSubscribers: false } } -class Client extends Dispatcher { +class Client extends DispatcherBase { constructor (url, { maxHeaderSize, headersTimeout, @@ -189,10 +189,7 @@ class Client extends Dispatcher { this[kKeepAliveMaxTimeout] = keepAliveMaxTimeout == null ? 600e3 : keepAliveMaxTimeout this[kKeepAliveTimeoutThreshold] = keepAliveTimeoutThreshold == null ? 1e3 : keepAliveTimeoutThreshold this[kKeepAliveTimeoutValue] = this[kKeepAliveDefaultTimeout] - this[kClosed] = false - this[kDestroyed] = false this[kServerName] = null - this[kOnDestroyed] = [] this[kResuming] = 0 // 0, idle, 1, scheduled, 2 resuming this[kNeedDrain] = 0 // 0, idle, 1, scheduled, 2 resuming this[kHostHeader] = `host: ${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}\r\n` @@ -201,6 +198,7 @@ class Client extends Dispatcher { this[kStrictContentLength] = strictContentLength == null ? true : strictContentLength this[kMaxRedirections] = maxRedirections this[kMaxRequests] = maxRequestsPerClient + this[kClosedResolve] = null // kQueue is built up of 3 sections separated by // the kRunningIdx and kPendingIdx indices. @@ -216,25 +214,15 @@ class Client extends Dispatcher { this[kPendingIdx] = 0 } - // TODO: Make private? get pipelining () { return this[kPipelining] } - // TODO: Make private? set pipelining (value) { this[kPipelining] = value resume(this, true) } - get destroyed () { - return this[kDestroyed] - } - - get closed () { - return this[kClosed] - } - get [kPending] () { return this[kQueue].length - this[kPendingIdx] } @@ -266,141 +254,68 @@ class Client extends Dispatcher { this.once('connect', cb) } - dispatch (opts, handler) { - if (!handler || typeof handler !== 'object') { - throw new InvalidArgumentError('handler must be an object') - } - - try { - if (!opts || typeof opts !== 'object') { - throw new InvalidArgumentError('opts must be an object.') - } - - if (this[kDestroyed]) { - throw new ClientDestroyedError() - } - - if (this[kClosed]) { - throw new ClientClosedError() - } - - const { maxRedirections = this[kMaxRedirections] } = opts - if (maxRedirections) { - handler = new RedirectHandler(this, maxRedirections, opts, handler) - } - - const origin = opts.origin || this[kUrl].origin - - const request = new Request(origin, opts, handler) - - this[kQueue].push(request) - if (this[kResuming]) { - // Do nothing. - } else if (util.bodyLength(request.body) == null && util.isIterable(request.body)) { - // Wait a tick in case stream/iterator is ended in the same tick. - this[kResuming] = 1 - process.nextTick(resume, this) - } else { - resume(this, true) - } - - if (this[kResuming] && this[kNeedDrain] !== 2 && this[kBusy]) { - this[kNeedDrain] = 2 - } - } catch (err) { - if (typeof handler.onError !== 'function') { - throw new InvalidArgumentError('invalid onError method') - } - - handler.onError(err) - } - - return this[kNeedDrain] < 2 - } - - close (callback) { - if (callback === undefined) { - return new Promise((resolve, reject) => { - this.close((err, data) => { - return err ? reject(err) : resolve(data) - }) - }) - } - - if (typeof callback !== 'function') { - throw new InvalidArgumentError('invalid callback') + [kDispatch] (opts, handler) { + const { maxRedirections = this[kMaxRedirections] } = opts + if (maxRedirections) { + handler = new RedirectHandler(this, maxRedirections, opts, handler) } - if (this[kDestroyed]) { - queueMicrotask(() => callback(new ClientDestroyedError(), null)) - return - } + const origin = opts.origin || this[kUrl].origin - this[kClosed] = true + const request = new Request(origin, opts, handler) - if (!this[kSize]) { - this.destroy(callback) + this[kQueue].push(request) + if (this[kResuming]) { + // Do nothing. + } else if (util.bodyLength(request.body) == null && util.isIterable(request.body)) { + // Wait a tick in case stream/iterator is ended in the same tick. + this[kResuming] = 1 + process.nextTick(resume, this) } else { - this[kOnDestroyed].push(callback) + resume(this, true) } - } - destroy (err, callback) { - if (typeof err === 'function') { - callback = err - err = null + if (this[kResuming] && this[kNeedDrain] !== 2 && this[kBusy]) { + this[kNeedDrain] = 2 } - if (callback === undefined) { - return new Promise((resolve, reject) => { - this.destroy(err, (err, data) => { - return err ? /* istanbul ignore next: should never error */ reject(err) : resolve(data) - }) - }) - } - - if (typeof callback !== 'function') { - throw new InvalidArgumentError('invalid callback') - } + return this[kNeedDrain] < 2 + } - if (this[kDestroyed]) { - if (this[kOnDestroyed]) { - this[kOnDestroyed].push(callback) + async [kClose] () { + return new Promise((resolve) => { + if (!this[kSize]) { + this.destroy(resolve) } else { - queueMicrotask(() => callback(null, null)) + this[kClosedResolve] = resolve } - return - } - - if (!err) { - err = new ClientDestroyedError() - } - - const requests = this[kQueue].splice(this[kPendingIdx]) - for (let i = 0; i < requests.length; i++) { - const request = requests[i] - errorRequest(this, request, err) - } + }) + } - this[kClosed] = true - this[kDestroyed] = true - this[kOnDestroyed].push(callback) + async [kDestroy] (err) { + return new Promise((resolve) => { + const requests = this[kQueue].splice(this[kPendingIdx]) + for (let i = 0; i < requests.length; i++) { + const request = requests[i] + errorRequest(this, request, err) + } - const onDestroyed = () => { - const callbacks = this[kOnDestroyed] - this[kOnDestroyed] = null - for (let i = 0; i < callbacks.length; i++) { - callbacks[i](null, null) + const callback = () => { + if (this[kClosedResolve]) { + this[kClosedResolve]() + this[kClosedResolve] = null + } + resolve() } - } - if (!this[kSocket]) { - queueMicrotask(onDestroyed) - } else { - util.destroy(this[kSocket].on('close', onDestroyed), err) - } + if (!this[kSocket]) { + queueMicrotask(callback) + } else { + util.destroy(this[kSocket].on('close', callback), err) + } - resume(this) + resume(this) + }) } } @@ -476,7 +391,6 @@ async function lazyllhttp () { let llhttpInstance = null let llhttpPromise = lazyllhttp() .catch(() => { - // TODO: Emit warning? }) let currentParser = null @@ -586,7 +500,6 @@ class Parser { currentBufferPtr = llhttp.malloc(currentBufferSize) } - // TODO (perf): Can we avoid this copy somehow? new Uint8Array(llhttp.memory.buffer, currentBufferPtr, currentBufferSize).set(data) // Call `execute` on the wasm parser. @@ -635,12 +548,10 @@ class Parser { try { try { currentParser = this - this.llhttp.llhttp_finish(this.ptr) // TODO (fix): Check ret? } finally { currentParser = null } } catch (err) { - // TODO (fix): What if socket is already destroyed? Error will be swallowed. /* istanbul ignore next: difficult to make a test case for */ util.destroy(this.socket, err) } @@ -782,13 +693,9 @@ class Parser { return -1 } - // TODO: Check for content-length mismatch from server? - assert(!this.upgrade) assert(this.statusCode < 200) - // TODO: More statusCode validation? - if (statusCode === 100) { util.destroy(socket, new SocketError('bad response', util.getSocketInfo(socket))) return -1 @@ -979,7 +886,6 @@ class Parser { util.destroy(socket, new InformationalError('reset')) return constants.ERROR.PAUSED } else if (!shouldKeepAlive) { - // TODO: What if running > 0? util.destroy(socket, new InformationalError('reset')) return constants.ERROR.PAUSED } else if (socket[kReset] && client[kRunning] === 0) { @@ -1079,7 +985,7 @@ function onSocketClose () { client[kSocket] = null - if (client[kDestroyed]) { + if (client.destroyed) { assert(client[kPending] === 0) // Fail entire queue. @@ -1251,14 +1157,14 @@ function resume (client, sync) { function _resume (client, sync) { while (true) { - if (client[kDestroyed]) { + if (client.destroyed) { assert(client[kPending] === 0) return } - if (client[kClosed] && !client[kSize]) { - client.destroy(util.nop) - continue + if (client.closed && !client[kSize]) { + client.destroy() + return } const socket = client[kSocket] @@ -1479,8 +1385,6 @@ function write (client, request) { socket[kBlocking] = true } - // TODO: Expect: 100-continue - let header = `${method} ${path} HTTP/1.1\r\n` if (typeof host === 'string') { @@ -1600,7 +1504,6 @@ function writeStream ({ body, client, request, socket, contentLength, header, ex writer.destroy(err) - // TODO (fix): Avoid using err.message for logic. if (err && (err.code !== 'UND_ERR_INFO' || err.message !== 'reset')) { util.destroy(body, err) } else { @@ -1679,7 +1582,6 @@ async function writeIterable ({ body, client, request, socket, contentLength, he const writer = new AsyncWriter({ socket, request, contentLength, client, expectsPayload, header }) try { - // TODO (fix): What if socket errors while waiting for body? // It's up to the user to somehow abort the async iterable. for await (const chunk of body) { if (socket[kError]) { @@ -1730,7 +1632,6 @@ class AsyncWriter { return true } - // TODO: What if not ended and bytesWritten === contentLength? // We should defer writing chunks. if (contentLength !== null && bytesWritten + len > contentLength) { if (client[kStrictContentLength]) { @@ -1800,7 +1701,6 @@ class AsyncWriter { } } - // TODO (fix): Add comment clarifying what this does? if (socket[kParser].timeout && socket[kParser].timeoutType === TIMEOUT_HEADERS) { // istanbul ignore else: only for jest if (socket[kParser].timeout.refresh) { diff --git a/lib/core/symbols.js b/lib/core/symbols.js index 1d28bc15e0c..30108827a84 100644 --- a/lib/core/symbols.js +++ b/lib/core/symbols.js @@ -1,4 +1,7 @@ module.exports = { + kClose: Symbol('close'), + kDestroy: Symbol('destroy'), + kDispatch: Symbol('dispatch'), kUrl: Symbol('url'), kWriting: Symbol('writing'), kResuming: Symbol('resuming'), diff --git a/lib/dispatcher-base.js b/lib/dispatcher-base.js new file mode 100644 index 00000000000..2c12ba80f35 --- /dev/null +++ b/lib/dispatcher-base.js @@ -0,0 +1,159 @@ +'use strict' + +const Dispatcher = require('./dispatcher') +const { + ClientDestroyedError, + ClientClosedError, + InvalidArgumentError +} = require('./core/errors') +const { kDestroy, kClose, kDispatch } = require('./core/symbols') + +const kDestroyed = Symbol('destroyed') +const kClosed = Symbol('closed') +const kOnDestroyed = Symbol('onDestroyed') +const kOnClosed = Symbol('onClosed') + +class DispatcherBase extends Dispatcher { + constructor () { + super() + + this[kDestroyed] = false + this[kOnDestroyed] = [] + this[kClosed] = false + this[kOnClosed] = [] + } + + get destroyed () { + return this[kDestroyed] + } + + get closed () { + return this[kClosed] + } + + close (callback) { + if (callback === undefined) { + return new Promise((resolve, reject) => { + this.close((err, data) => { + return err ? reject(err) : resolve(data) + }) + }) + } + + if (typeof callback !== 'function') { + throw new InvalidArgumentError('invalid callback') + } + + if (this[kDestroyed]) { + queueMicrotask(() => callback(new ClientDestroyedError(), null)) + return + } + + if (this[kClosed]) { + if (this[kOnClosed]) { + this[kOnClosed].push(callback) + } else { + queueMicrotask(() => callback(null, null)) + } + return + } + + this[kClosed] = true + this[kOnClosed].push(callback) + + const onClosed = () => { + const callbacks = this[kOnClosed] + this[kOnClosed] = null + for (let i = 0; i < callbacks.length; i++) { + callbacks[i](null, null) + } + } + + // Should not error. + this[kClose]() + .then(() => this.destroy()) + .then(() => { + queueMicrotask(onClosed) + }) + } + + destroy (err, callback) { + if (typeof err === 'function') { + callback = err + err = null + } + + if (callback === undefined) { + return new Promise((resolve, reject) => { + this.destroy(err, (err, data) => { + return err ? /* istanbul ignore next: should never error */ reject(err) : resolve(data) + }) + }) + } + + if (typeof callback !== 'function') { + throw new InvalidArgumentError('invalid callback') + } + + if (this[kDestroyed]) { + if (this[kOnDestroyed]) { + this[kOnDestroyed].push(callback) + } else { + queueMicrotask(() => callback(null, null)) + } + return + } + + if (!err) { + err = new ClientDestroyedError() + } + + this[kDestroyed] = true + this[kOnDestroyed].push(callback) + + const onDestroyed = () => { + const callbacks = this[kOnDestroyed] + this[kOnDestroyed] = null + for (let i = 0; i < callbacks.length; i++) { + callbacks[i](null, null) + } + } + + // Should not error. + this[kDestroy](err).then(() => { + queueMicrotask(onDestroyed) + }) + } + + dispatch (opts, handler) { + if (!handler || typeof handler !== 'object') { + throw new InvalidArgumentError('handler must be an object') + } + + try { + if (!opts || typeof opts !== 'object') { + throw new InvalidArgumentError('opts must be an object.') + } + + if (this[kDestroyed]) { + throw new ClientDestroyedError() + } + + if (this[kClosed]) { + throw new ClientClosedError() + } + + return this[kDispatch](opts, handler) + } catch (err) { + if (typeof handler.onError !== 'function') { + throw new InvalidArgumentError('invalid onError method') + } + + handler.onError(err) + + return false + } + } +} + +module.exports = DispatcherBase diff --git a/lib/pool-base.js b/lib/pool-base.js index 274280f835d..2a909eee083 100644 --- a/lib/pool-base.js +++ b/lib/pool-base.js @@ -1,20 +1,13 @@ 'use strict' -const Dispatcher = require('./dispatcher') -const { - ClientDestroyedError, - ClientClosedError, - InvalidArgumentError -} = require('./core/errors') +const DispatcherBase = require('./dispatcher-base') const FixedQueue = require('./node/fixed-queue') -const { kConnected, kSize, kRunning, kPending, kQueued, kBusy, kFree, kUrl } = require('./core/symbols') +const { kConnected, kSize, kRunning, kPending, kQueued, kBusy, kFree, kUrl, kClose, kDestroy, kDispatch } = require('./core/symbols') const PoolStats = require('./pool-stats') const kClients = Symbol('clients') const kNeedDrain = Symbol('needDrain') const kQueue = Symbol('queue') -const kDestroyed = Symbol('destroyed') -const kClosedPromise = Symbol('closed promise') const kClosedResolve = Symbol('closed resolve') const kOnDrain = Symbol('onDrain') const kOnConnect = Symbol('onConnect') @@ -25,16 +18,12 @@ const kAddClient = Symbol('add client') const kRemoveClient = Symbol('remove client') const kStats = Symbol('stats') -class PoolBase extends Dispatcher { +class PoolBase extends DispatcherBase { constructor () { super() this[kQueue] = new FixedQueue() - this[kClosedPromise] = null - this[kClosedResolve] = null - this[kDestroyed] = false this[kClients] = [] - this[kNeedDrain] = false this[kQueued] = 0 const pool = this @@ -122,59 +111,17 @@ class PoolBase extends Dispatcher { return this[kStats] } - get destroyed () { - return this[kDestroyed] - } - - get closed () { - return this[kClosedPromise] != null - } - - close (cb) { - try { - if (this[kDestroyed]) { - throw new ClientDestroyedError() - } - - if (!this[kClosedPromise]) { - if (this[kQueue].isEmpty()) { - this[kClosedPromise] = Promise.all(this[kClients].map(c => c.close())) - } else { - this[kClosedPromise] = new Promise((resolve) => { - this[kClosedResolve] = resolve - }) - } - this[kClosedPromise] = this[kClosedPromise].then(() => { - this[kDestroyed] = true - }) - } - - if (cb) { - this[kClosedPromise].then(() => cb(null, null)) - } else { - return this[kClosedPromise] - } - } catch (err) { - if (cb) { - cb(err) - } else { - return Promise.reject(err) - } + async [kClose] () { + if (this[kQueue].isEmpty()) { + return Promise.all(this[kClients].map(c => c.close())) + } else { + return new Promise((resolve) => { + this[kClosedResolve] = resolve + }) } } - destroy (err, cb) { - this[kDestroyed] = true - - if (typeof err === 'function') { - cb = err - err = null - } - - if (!err) { - err = new ClientDestroyedError() - } - + async [kDestroy] (err) { while (true) { const item = this[kQueue].shift() if (!item) { @@ -183,44 +130,19 @@ class PoolBase extends Dispatcher { item.handler.onError(err) } - const promise = Promise.all(this[kClients].map(c => c.destroy(err))) - if (cb) { - promise.then(() => cb(null, null)) - } else { - return promise - } + return Promise.all(this[kClients].map(c => c.destroy(err))) } - dispatch (opts, handler) { - if (!handler || typeof handler !== 'object') { - throw new InvalidArgumentError('handler must be an object') - } - - try { - if (this[kDestroyed]) { - throw new ClientDestroyedError() - } - - if (this[kClosedPromise]) { - throw new ClientClosedError() - } - - const dispatcher = this[kGetDispatcher]() - - if (!dispatcher) { - this[kNeedDrain] = true - this[kQueue].push({ opts, handler }) - this[kQueued]++ - } else if (!dispatcher.dispatch(opts, handler)) { - dispatcher[kNeedDrain] = true - this[kNeedDrain] = !this[kGetDispatcher]() - } - } catch (err) { - if (typeof handler.onError !== 'function') { - throw new InvalidArgumentError('invalid onError method') - } + [kDispatch] (opts, handler) { + const dispatcher = this[kGetDispatcher]() - handler.onError(err) + if (!dispatcher) { + this[kNeedDrain] = true + this[kQueue].push({ opts, handler }) + this[kQueued]++ + } else if (!dispatcher.dispatch(opts, handler)) { + dispatcher[kNeedDrain] = true + this[kNeedDrain] = !this[kGetDispatcher]() } return !this[kNeedDrain] diff --git a/lib/proxy-agent.js b/lib/proxy-agent.js index b0dec86ac00..ee674df646f 100644 --- a/lib/proxy-agent.js +++ b/lib/proxy-agent.js @@ -1,14 +1,14 @@ 'use strict' -const { kProxy } = require('./core/symbols') +const { kProxy, kClose, kDestroy } = require('./core/symbols') const { URL } = require('url') const Agent = require('./agent') -const Dispatcher = require('./dispatcher') +const DispatcherBase = require('./dispatcher-base') const { InvalidArgumentError } = require('./core/errors') const kAgent = Symbol('proxy agent') -class ProxyAgent extends Dispatcher { +class ProxyAgent extends DispatcherBase { constructor (opts) { super(opts) this[kProxy] = buildProxyOptions(opts) @@ -31,9 +31,13 @@ class ProxyAgent extends Dispatcher { ) } - async close () { + async [kClose] () { await this[kAgent].close() } + + async [kDestroy] () { + await this[kAgent].destroy() + } } function buildProxyOptions (opts) { diff --git a/test/agent.js b/test/agent.js index e90bbb3f76c..f341a16dea8 100644 --- a/test/agent.js +++ b/test/agent.js @@ -71,7 +71,7 @@ test('agent should call callback after closing internal pools', t => { t.fail('second request should not resolve') }) .catch(err => { - t.type(err, errors.ClientClosedError) + t.type(err, errors.ClientDestroyedError) }) }) }) @@ -120,7 +120,7 @@ test('agent should close internal pools', t => { t.fail('second request should not resolve') }) .catch(err => { - t.type(err, errors.ClientClosedError) + t.type(err, errors.ClientDestroyedError) }) }) }) diff --git a/test/client-request.js b/test/client-request.js index 1c357dccc06..1956dc61bb1 100644 --- a/test/client-request.js +++ b/test/client-request.js @@ -554,11 +554,12 @@ test('request onInfo callback headers parsing', async (t) => { const client = new Client(`http://localhost:${server.address().port}`) t.teardown(client.close.bind(client)) - await client.request({ + const { body } = await client.request({ path: '/', method: 'GET', onInfo: (x) => { infos.push(x) } }) + await body.dump() t.equal(infos.length, 1) t.equal(infos[0].statusCode, 103) t.same(infos[0].headers, { link: '; rel=preload; as=style' }) @@ -589,12 +590,13 @@ test('request raw responseHeaders', async (t) => { const client = new Client(`http://localhost:${server.address().port}`) t.teardown(client.close.bind(client)) - const { headers } = await client.request({ + const { body, headers } = await client.request({ path: '/', method: 'GET', responseHeaders: 'raw', onInfo: (x) => { infos.push(x) } }) + await body.dump() t.equal(infos.length, 1) t.same(infos[0].headers, ['Link', '; rel=preload; as=style']) t.same(headers, ['Date', 'Sat, 09 Oct 2010 14:28:02 GMT', 'Connection', 'close']) diff --git a/test/close-and-destroy.js b/test/close-and-destroy.js index 3c850358dba..bd50ebb8e99 100644 --- a/test/close-and-destroy.js +++ b/test/close-and-destroy.js @@ -161,7 +161,7 @@ test('close should still reconnect', (t) => { t.ok(!makeRequest()) client.close((err) => { - t.equal(err, null) + t.error(err) t.equal(client.closed, true) }) client.once('connect', () => { @@ -196,7 +196,7 @@ test('close should call callback once finished', (t) => { t.ok(!makeRequest()) client.close((err) => { - t.equal(err, null) + t.error(err) t.equal(client.closed, true) }) diff --git a/test/mock-agent.js b/test/mock-agent.js index a0301f208d8..3c4edbf81b9 100644 --- a/test/mock-agent.js +++ b/test/mock-agent.js @@ -6,7 +6,7 @@ const { promisify } = require('util') const { request, setGlobalDispatcher, MockAgent, Agent } = require('..') const { getResponse } = require('../lib/mock/mock-utils') const { kClients, kConnected } = require('../lib/core/symbols') -const { InvalidArgumentError, ClientClosedError } = require('../lib/core/errors') +const { InvalidArgumentError, ClientDestroyedError } = require('../lib/core/errors') const MockClient = require('../lib/mock/mock-client') const MockPool = require('../lib/mock/mock-pool') const { kAgent } = require('../lib/mock/mock-symbols') @@ -44,7 +44,6 @@ test('MockAgent - constructor', t => { const agent = new Agent() t.teardown(agent.close.bind(agent)) const mockAgent = new MockAgent({ agent }) - t.teardown(mockAgent.close.bind(mockAgent)) t.equal(mockAgent[kAgent], agent) }) @@ -238,7 +237,6 @@ test('MockAgent - .close should clean up registered pools', async (t) => { const baseUrl = 'http://localhost:9999' const mockAgent = new MockAgent() - t.teardown(mockAgent.close.bind(mockAgent)) // Register a pool const mockPool = mockAgent.get(baseUrl) @@ -257,7 +255,6 @@ test('MockAgent - .close should clean up registered clients', async (t) => { const baseUrl = 'http://localhost:9999' const mockAgent = new MockAgent({ connections: 1 }) - t.teardown(mockAgent.close.bind(mockAgent)) // Register a pool const mockClient = mockAgent.get(baseUrl) @@ -289,7 +286,6 @@ test('MockAgent - [kClients] should match encapsulated agent', async (t) => { t.teardown(agent.close.bind(agent)) const mockAgent = new MockAgent({ agent }) - t.teardown(mockAgent.close.bind(mockAgent)) const mockPool = mockAgent.get(baseUrl) mockPool.intercept({ @@ -454,7 +450,6 @@ test('MockAgent - should support specifying custom agents to mock', async (t) => const mockAgent = new MockAgent({ agent }) setGlobalDispatcher(mockAgent) - t.teardown(mockAgent.close.bind(mockAgent)) const mockPool = mockAgent.get(baseUrl) mockPool.intercept({ @@ -987,7 +982,6 @@ test('MockAgent - close removes all registered mock clients', async (t) => { const mockAgent = new MockAgent({ connections: 1 }) setGlobalDispatcher(mockAgent) - t.teardown(mockAgent.close.bind(mockAgent)) const mockClient = mockAgent.get(baseUrl) mockClient.intercept({ @@ -1001,7 +995,7 @@ test('MockAgent - close removes all registered mock clients', async (t) => { try { await request(`${baseUrl}/foo`, { method: 'GET' }) } catch (err) { - t.type(err, ClientClosedError) + t.type(err, ClientDestroyedError) } }) @@ -1022,7 +1016,6 @@ test('MockAgent - close removes all registered mock pools', async (t) => { const mockAgent = new MockAgent() setGlobalDispatcher(mockAgent) - t.teardown(mockAgent.close.bind(mockAgent)) const mockPool = mockAgent.get(baseUrl) mockPool.intercept({ @@ -1036,7 +1029,7 @@ test('MockAgent - close removes all registered mock pools', async (t) => { try { await request(`${baseUrl}/foo`, { method: 'GET' }) } catch (err) { - t.type(err, ClientClosedError) + t.type(err, ClientDestroyedError) } })