diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 1a5086f0cc..d34edd38a5 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -107,7 +107,7 @@ export type ConnectionPoolEvents = { */ export class ConnectionPool extends TypedEventEmitter { closed: boolean; - options: Readonly; + options: Readonly; /** @internal */ [kLogger]: Logger; /** @internal */ @@ -199,6 +199,7 @@ export class ConnectionPool extends TypedEventEmitter { connectionType: Connection, maxPoolSize: options.maxPoolSize ?? 100, minPoolSize: options.minPoolSize ?? 0, + maxConnecting: 2, maxIdleTimeMS: options.maxIdleTimeMS ?? 0, waitQueueTimeoutMS: options.waitQueueTimeoutMS ?? 0, autoEncrypter: options.autoEncrypter, @@ -494,16 +495,29 @@ export class ConnectionPool extends TypedEventEmitter { } function ensureMinPoolSize(pool: ConnectionPool) { - if (pool.closed || pool.options.minPoolSize === 0) { + const minPoolSize = pool.options.minPoolSize; + if (pool.closed || minPoolSize === 0) { return; } - const minPoolSize = pool.options.minPoolSize; - for (let i = pool.totalConnectionCount; i < minPoolSize; ++i) { - createConnection(pool); + if ( + pool.totalConnectionCount < minPoolSize && + pool.pendingConnectionCount < pool.options.maxConnecting + ) { + // NOTE: ensureMinPoolSize should not try to get all the pending + // connection permits because that potentially delays the availability of + // the connection to a checkout request + createConnection(pool, (err, connection) => { + pool[kPending]--; + if (!err && connection) { + pool[kConnections].push(connection); + process.nextTick(processWaitQueue, pool); + } + pool[kMinPoolSizeTimer] = setTimeout(() => ensureMinPoolSize(pool), 10); + }); + } else { + pool[kMinPoolSizeTimer] = setTimeout(() => ensureMinPoolSize(pool), 100); } - - pool[kMinPoolSizeTimer] = setTimeout(() => ensureMinPoolSize(pool), 10); } function connectionIsStale(pool: ConnectionPool, connection: Connection) { @@ -521,7 +535,7 @@ function connectionIsIdle(pool: ConnectionPool, connection: Connection) { return !!(pool.options.maxIdleTimeMS && connection.idleTime > pool.options.maxIdleTimeMS); } -function createConnection(pool: ConnectionPool, callback?: Callback) { +function createConnection(pool: ConnectionPool, callback: Callback) { const connectOptions: ConnectionOptions = { ...pool.options, id: pool[kConnectionCounter].next().value, @@ -530,14 +544,16 @@ function createConnection(pool: ConnectionPool, callback?: Callback) }; pool[kPending]++; + // This is our version of a "virtual" no-I/O connection as the spec requires + pool.emit( + ConnectionPool.CONNECTION_CREATED, + new ConnectionCreatedEvent(pool, { id: connectOptions.id }) + ); + connect(connectOptions, (err, connection) => { if (err || !connection) { - pool[kPending]--; pool[kLogger].debug(`connection attempt failed with error [${JSON.stringify(err)}]`); - if (typeof callback === 'function') { - callback(err); - } - + callback(err); return; } @@ -553,8 +569,6 @@ function createConnection(pool: ConnectionPool, callback?: Callback) connection.on(event, (e: any) => pool.emit(event, e)); } - pool.emit(ConnectionPool.CONNECTION_CREATED, new ConnectionCreatedEvent(pool, connection)); - if (pool.loadBalanced) { connection.on(Connection.PINNED, pinType => pool[kMetrics].markPinned(pinType)); connection.on(Connection.UNPINNED, pinType => pool[kMetrics].markUnpinned(pinType)); @@ -575,16 +589,8 @@ function createConnection(pool: ConnectionPool, callback?: Callback) connection.markAvailable(); pool.emit(ConnectionPool.CONNECTION_READY, new ConnectionReadyEvent(pool, connection)); - // if a callback has been provided, hand off the connection immediately - if (typeof callback === 'function') { - callback(undefined, connection); - return; - } - - // otherwise add it to the pool for later acquisition, and try to process the wait queue - pool[kConnections].push(connection); - pool[kPending]--; - process.nextTick(processWaitQueue, pool); + callback(undefined, connection); + return; }); } @@ -642,44 +648,45 @@ function processWaitQueue(pool: ConnectionPool) { } } - const maxPoolSize = pool.options.maxPoolSize; - if (pool.waitQueueSize && (maxPoolSize <= 0 || pool.totalConnectionCount < maxPoolSize)) { + const { maxPoolSize, maxConnecting } = pool.options; + while ( + pool.waitQueueSize > 0 && + pool.pendingConnectionCount < maxConnecting && + (maxPoolSize === 0 || pool.totalConnectionCount < maxPoolSize) + ) { + const waitQueueMember = pool[kWaitQueue].shift(); + if (!waitQueueMember || waitQueueMember[kCancelled]) { + continue; + } createConnection(pool, (err, connection) => { - const waitQueueMember = pool[kWaitQueue].shift(); - if (!waitQueueMember || waitQueueMember[kCancelled]) { + pool[kPending]--; + if (waitQueueMember[kCancelled]) { if (!err && connection) { pool[kConnections].push(connection); - pool[kPending]--; + } + } else { + if (err) { + pool.emit( + ConnectionPool.CONNECTION_CHECK_OUT_FAILED, + new ConnectionCheckOutFailedEvent(pool, err) + ); + } else if (connection) { + pool[kCheckedOut]++; + pool.emit( + ConnectionPool.CONNECTION_CHECKED_OUT, + new ConnectionCheckedOutEvent(pool, connection) + ); } - pool[kProcessingWaitQueue] = false; - return; - } - - if (err) { - pool.emit( - ConnectionPool.CONNECTION_CHECK_OUT_FAILED, - new ConnectionCheckOutFailedEvent(pool, err) - ); - } else if (connection) { - pool[kCheckedOut]++; - pool[kPending]--; - pool.emit( - ConnectionPool.CONNECTION_CHECKED_OUT, - new ConnectionCheckedOutEvent(pool, connection) - ); - } - - if (waitQueueMember.timer) { - clearTimeout(waitQueueMember.timer); + if (waitQueueMember.timer) { + clearTimeout(waitQueueMember.timer); + } + waitQueueMember.callback(err, connection); } - waitQueueMember.callback(err, connection); - pool[kProcessingWaitQueue] = false; - process.nextTick(() => processWaitQueue(pool)); + process.nextTick(processWaitQueue, pool); }); - } else { - pool[kProcessingWaitQueue] = false; } + pool[kProcessingWaitQueue] = false; } /** diff --git a/src/cmap/connection_pool_events.ts b/src/cmap/connection_pool_events.ts index ee67272b99..a98dc25d78 100644 --- a/src/cmap/connection_pool_events.ts +++ b/src/cmap/connection_pool_events.ts @@ -59,7 +59,7 @@ export class ConnectionCreatedEvent extends ConnectionPoolMonitoringEvent { connectionId: number | ''; /** @internal */ - constructor(pool: ConnectionPool, connection: Connection) { + constructor(pool: ConnectionPool, connection: { id: number | '' }) { super(pool); this.connectionId = connection.id; } diff --git a/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-minPoolSize-population-stats.json b/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-minPoolSize-population-stats.json index 1d3f6ecfd2..cb619e2f75 100644 --- a/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-minPoolSize-population-stats.json +++ b/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-minPoolSize-population-stats.json @@ -33,16 +33,16 @@ "address": 42, "currentCheckedOutCount": 0, "availableConnectionCount": 0, - "pendingConnectionCount": 3, - "totalConnectionCount": 3 + "pendingConnectionCount": 1, + "totalConnectionCount": 1 }, { "type": "ConnectionCreated", "connectionId": 42, "address": 42, "availableConnectionCount": 1, - "pendingConnectionCount": 2, - "totalConnectionCount": 3 + "pendingConnectionCount": 1, + "totalConnectionCount": 2 }, { "type": "ConnectionCreated", diff --git a/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-minPoolSize-replace-removed-connections.json b/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-minPoolSize-replace-removed-connections.json new file mode 100644 index 0000000000..0238a09fad --- /dev/null +++ b/test/integration/connection-monitoring-and-pooling/cmap-node-specs/pool-minPoolSize-replace-removed-connections.json @@ -0,0 +1,77 @@ +{ + "version": 1, + "style": "unit", + "description": "must replace removed connections up to minPoolSize", + "poolOptions": { + "minPoolSize": 2 + }, + "operations": [ + { + "name": "waitForEvent", + "event": "ConnectionReady", + "count": 2 + }, + { + "name": "wait", + "ms": 1000 + }, + { + "name": "checkOut", + "label": "conn" + }, + { + "name": "clear" + }, + { + "name": "checkIn", + "connection": "conn" + }, + { + "name": "waitForEvent", + "event": "ConnectionReady", + "count": 3 + } + ], + "events": [ + { + "type": "ConnectionReady", + "address": 42 + }, + { + "type": "ConnectionReady", + "address": 42 + }, + { + "type": "ConnectionCheckedOut", + "address": 42 + }, + { + "type": "ConnectionPoolCleared", + "address": 42 + }, + { + "type": "ConnectionCheckedIn", + "address": 42 + }, + { + "type": "ConnectionClosed", + "reason": "stale", + "address": 42, + "availableConnectionCount": 1, + "pendingConnectionCount": 0, + "totalConnectionCount": 1 + }, + { + "type": "ConnectionReady", + "address": 42, + "availableConnectionCount": 1, + "pendingConnectionCount": 1, + "totalConnectionCount": 2 + } + ], + "ignore": [ + "ConnectionPoolCreated", + "ConnectionCreated", + "ConnectionCheckOutStarted" + ] +} diff --git a/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts b/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts index 81f2801546..914f04b27d 100644 --- a/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts +++ b/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts @@ -15,9 +15,14 @@ const LB_SKIP_TESTS: SkipDescription[] = [ describe('Connection Monitoring and Pooling Spec Tests (Integration)', function () { const tests: CmapTest[] = loadSpecTests('connection-monitoring-and-pooling'); - runCmapTestSuite( - // TODO(NODE-2993): unskip integration tests for maxConnecting - tests.filter(({ style }) => style === 'unit'), - { testsToSkip: LB_SKIP_TESTS } - ); + runCmapTestSuite(tests, { + testsToSkip: LB_SKIP_TESTS.concat([ + { + description: 'waiting on maxConnecting is limited by WaitQueueTimeoutMS', + skipIfCondition: 'always', + skipReason: + 'not applicable: waitQueueTimeoutMS limits connection establishment time in our driver' + } + ]) + }); }); diff --git a/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.test.ts b/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.test.ts index 2662e6629a..b76cd66a50 100644 --- a/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.test.ts +++ b/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.test.ts @@ -6,5 +6,14 @@ describe('Connection Monitoring and Pooling (Node Driver)', function () { '../integration/connection-monitoring-and-pooling/cmap-node-specs' ); - runCmapTestSuite(tests, { injectPoolStats: true }); + runCmapTestSuite(tests, { + injectPoolStats: true, + testsToSkip: [ + { + description: 'must replace removed connections up to minPoolSize', + skipIfCondition: 'loadBalanced', + skipReason: 'cannot run against load balancer due to reliance on pool.clear() command' + } + ] + }); }); diff --git a/test/tools/cmap_spec_runner.ts b/test/tools/cmap_spec_runner.ts index e80372ea17..78d6178f1b 100644 --- a/test/tools/cmap_spec_runner.ts +++ b/test/tools/cmap_spec_runner.ts @@ -116,12 +116,14 @@ class Thread { await sleep(); } - queue(op: CmapOperation) { + queue(op: CmapOperation, thread?: Thread) { if (this.#killed || this.#error) { return; } - this.#promise = this.#promise.then(() => this._runOperation(op)).catch(e => (this.#error = e)); + const functionToQueue = () => (!thread ? this._runOperation(op) : thread.queue(op)); + + this.#promise = this.#promise.then(functionToQueue).catch(e => (this.#error = e)); } async finish() { @@ -352,9 +354,12 @@ async function runCmapTest(test: CmapTest, threadContext: ThreadContext) { const op = operations[idx]; const threadKey = op.name === 'checkOut' ? op.thread || MAIN_THREAD_KEY : MAIN_THREAD_KEY; - const thread = threadContext.getThread(threadKey); - - thread.queue(op); + if (threadKey === MAIN_THREAD_KEY) { + mainThread.queue(op); + } else { + const thread = threadContext.getThread(threadKey); + mainThread.queue(op, thread); + } } await mainThread.finish().catch(e => { @@ -387,6 +392,7 @@ async function runCmapTest(test: CmapTest, threadContext: ThreadContext) { ); expect(actualEvents).to.have.lengthOf(expectedEvents.length); + for (const expected of expectedEvents) { const actual = actualEvents.shift(); const { type: eventType, ...eventPropsToCheck } = expected; @@ -397,7 +403,7 @@ async function runCmapTest(test: CmapTest, threadContext: ThreadContext) { export type SkipDescription = { description: string; - skipIfCondition: 'loadBalanced'; + skipIfCondition: 'loadBalanced' | 'always'; skipReason: string; }; @@ -416,10 +422,12 @@ export function runCmapTestSuite( ({ description }) => description === test.description ); if (skipDescription) { + const alwaysSkip = skipDescription.skipIfCondition === 'always'; const matchesLoadBalanceSkip = skipDescription.skipIfCondition === 'loadBalanced' && this.configuration.isLoadBalanced; - if (matchesLoadBalanceSkip) { - (this.currentTest as Mocha.Runnable).skipReason = skipDescription.skipReason; + + if (alwaysSkip || matchesLoadBalanceSkip) { + this.currentTest.skipReason = skipDescription.skipReason; this.skip(); } }