From e641bd44ef39d64b8b572e7a8ab6bfc71a2b4bed Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Mon, 28 Nov 2022 15:13:41 -0500 Subject: [PATCH] feat(NODE-4691): interrupt in-flight operations on heartbeat failure (#3457) --- src/cmap/connection_pool.ts | 58 ++++++++++++++++--- src/cmap/errors.ts | 27 +++++++-- src/error.ts | 1 + src/sdam/monitor.ts | 5 +- src/sdam/server.ts | 6 +- src/sdam/topology.ts | 6 +- src/sessions.ts | 2 +- ...ection_monitoring_and_pooling.spec.test.ts | 45 ++++++++------ ...rver_discovery_and_monitoring.spec.test.ts | 13 +---- test/tools/cmap_spec_runner.ts | 6 +- test/unit/sdam/server.test.ts | 4 +- 11 files changed, 118 insertions(+), 55 deletions(-) diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index f621b4f9b4..5c8cbc9765 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -41,7 +41,12 @@ import { ConnectionPoolReadyEvent, ConnectionReadyEvent } from './connection_pool_events'; -import { PoolClearedError, PoolClosedError, WaitQueueTimeoutError } from './errors'; +import { + PoolClearedError, + PoolClearedOnNetworkError, + PoolClosedError, + WaitQueueTimeoutError +} from './errors'; import { ConnectionPoolMetrics } from './metrics'; /** @internal */ @@ -382,6 +387,9 @@ export class ConnectionPool extends TypedEventEmitter { * @param connection - The connection to check in */ checkIn(connection: Connection): void { + if (!this[kCheckedOut].has(connection)) { + return; + } const poolClosed = this.closed; const stale = this.connectionIsStale(connection); const willDestroy = !!(poolClosed || stale || connection.closed); @@ -408,13 +416,19 @@ export class ConnectionPool extends TypedEventEmitter { * Pool reset is handled by incrementing the pool's generation count. Any existing connection of a * previous generation will eventually be pruned during subsequent checkouts. */ - clear(serviceId?: ObjectId): void { + clear(options: { serviceId?: ObjectId; interruptInUseConnections?: boolean } = {}): void { if (this.closed) { return; } // handle load balanced case - if (this.loadBalanced && serviceId) { + if (this.loadBalanced) { + const { serviceId } = options; + if (!serviceId) { + throw new MongoRuntimeError( + 'ConnectionPool.clear() called in load balanced mode with no serviceId.' + ); + } const sid = serviceId.toHexString(); const generation = this.serviceGenerations.get(sid); // Only need to worry if the generation exists, since it should @@ -431,19 +445,42 @@ export class ConnectionPool extends TypedEventEmitter { ); return; } - // handle non load-balanced case + const interruptInUseConnections = options.interruptInUseConnections ?? false; + const oldGeneration = this[kGeneration]; this[kGeneration] += 1; const alreadyPaused = this[kPoolState] === PoolState.paused; this[kPoolState] = PoolState.paused; this.clearMinPoolSizeTimer(); if (!alreadyPaused) { - this.emit(ConnectionPool.CONNECTION_POOL_CLEARED, new ConnectionPoolClearedEvent(this)); + this.emit( + ConnectionPool.CONNECTION_POOL_CLEARED, + new ConnectionPoolClearedEvent(this, { interruptInUseConnections }) + ); + } + + if (interruptInUseConnections) { + process.nextTick(() => this.interruptInUseConnections(oldGeneration)); } + this.processWaitQueue(); } + /** + * Closes all stale in-use connections in the pool with a resumable PoolClearedOnNetworkError. + * + * Only connections where `connection.generation <= minGeneration` are killed. + */ + private interruptInUseConnections(minGeneration: number) { + for (const connection of this[kCheckedOut]) { + if (connection.generation <= minGeneration) { + this.checkIn(connection); + connection.onError(new PoolClearedOnNetworkError(this)); + } + } + } + /** Close the pool */ close(callback: Callback): void; close(options: CloseOptions, callback: Callback): void; @@ -572,7 +609,12 @@ export class ConnectionPool extends TypedEventEmitter { return !!(this.options.maxIdleTimeMS && connection.idleTime > this.options.maxIdleTimeMS); } - private connectionIsPerished(connection: Connection) { + /** + * Destroys a connection if the connection is perished. + * + * @returns `true` if the connection was destroyed, `false` otherwise. + */ + private destroyConnectionIfPerished(connection: Connection): boolean { const isStale = this.connectionIsStale(connection); const isIdle = this.connectionIsIdle(connection); if (!isStale && !isIdle && !connection.closed) { @@ -658,7 +700,7 @@ export class ConnectionPool extends TypedEventEmitter { return; } - this[kConnections].prune(connection => this.connectionIsPerished(connection)); + this[kConnections].prune(connection => this.destroyConnectionIfPerished(connection)); if ( this.totalConnectionCount < minPoolSize && @@ -734,7 +776,7 @@ export class ConnectionPool extends TypedEventEmitter { break; } - if (!this.connectionIsPerished(connection)) { + if (!this.destroyConnectionIfPerished(connection)) { this[kCheckedOut].add(connection); this.emit( ConnectionPool.CONNECTION_CHECKED_OUT, diff --git a/src/cmap/errors.ts b/src/cmap/errors.ts index 4e0d25a27c..f6d2ed5888 100644 --- a/src/cmap/errors.ts +++ b/src/cmap/errors.ts @@ -1,4 +1,4 @@ -import { MongoDriverError, MongoNetworkError } from '../error'; +import { MongoDriverError, MongoErrorLabel, MongoNetworkError } from '../error'; import type { ConnectionPool } from './connection_pool'; /** @@ -27,11 +27,14 @@ export class PoolClearedError extends MongoNetworkError { /** The address of the connection pool */ address: string; - constructor(pool: ConnectionPool) { - super( - `Connection pool for ${pool.address} was cleared because another operation failed with: "${pool.serverError?.message}"` - ); + constructor(pool: ConnectionPool, message?: string) { + const errorMessage = message + ? message + : `Connection pool for ${pool.address} was cleared because another operation failed with: "${pool.serverError?.message}"`; + super(errorMessage); this.address = pool.address; + + this.addErrorLabel(MongoErrorLabel.RetryableWriteError); } override get name(): string { @@ -39,6 +42,20 @@ export class PoolClearedError extends MongoNetworkError { } } +/** + * An error indicating that a connection pool has been cleared after the monitor for that server timed out. + * @category Error + */ +export class PoolClearedOnNetworkError extends PoolClearedError { + constructor(pool: ConnectionPool) { + super(pool, `Connection to ${pool.address} interrupted due to server monitor timeout`); + } + + override get name(): string { + return 'PoolClearedOnNetworkError'; + } +} + /** * An error thrown when a request to check out a connection times out * @category Error diff --git a/src/error.ts b/src/error.ts index 800be95c8b..1dd426cb4f 100644 --- a/src/error.ts +++ b/src/error.ts @@ -91,6 +91,7 @@ export const MongoErrorLabel = Object.freeze({ ResumableChangeStreamError: 'ResumableChangeStreamError', HandshakeError: 'HandshakeError', ResetPool: 'ResetPool', + InterruptInUseConnections: 'InterruptInUseConnections', NoWritesPerformed: 'NoWritesPerformed' } as const); diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index 3711dc59ed..b35093b435 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -4,7 +4,7 @@ import { Document, Long } from '../bson'; import { connect } from '../cmap/connect'; import { Connection, ConnectionOptions } from '../cmap/connection'; import { LEGACY_HELLO_COMMAND } from '../constants'; -import { MongoError, MongoErrorLabel } from '../error'; +import { MongoError, MongoErrorLabel, MongoNetworkTimeoutError } from '../error'; import { CancellationToken, TypedEventEmitter } from '../mongo_types'; import type { Callback } from '../utils'; import { calculateDurationInMs, EventEmitterWithState, makeStateMachine, now, ns } from '../utils'; @@ -221,6 +221,9 @@ function checkServer(monitor: Monitor, callback: Callback) { const error = !(err instanceof MongoError) ? new MongoError(err) : err; error.addErrorLabel(MongoErrorLabel.ResetPool); + if (error instanceof MongoNetworkTimeoutError) { + error.addErrorLabel(MongoErrorLabel.InterruptInUseConnections); + } monitor.emit('resetServer', error); callback(err); diff --git a/src/sdam/server.ts b/src/sdam/server.ts index a693bf3bc8..ae7a1fd5f6 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -354,8 +354,6 @@ export class Server extends TypedEventEmitter { } if (!(err instanceof PoolClearedError)) { this.handleError(err); - } else { - err.addErrorLabel(MongoErrorLabel.RetryableWriteError); } return cb(err); } @@ -400,14 +398,14 @@ export class Server extends TypedEventEmitter { error.addErrorLabel(MongoErrorLabel.ResetPool); markServerUnknown(this, error); } else if (connection) { - this.s.pool.clear(connection.serviceId); + this.s.pool.clear({ serviceId: connection.serviceId }); } } else { if (isSDAMUnrecoverableError(error)) { if (shouldHandleStateChangeError(this, error)) { const shouldClearPool = maxWireVersion(this) <= 7 || isNodeShuttingDownError(error); if (this.loadBalanced && connection && shouldClearPool) { - this.s.pool.clear(connection.serviceId); + this.s.pool.clear({ serviceId: connection.serviceId }); } if (!this.loadBalanced) { diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index d157ef63f6..601ae2c382 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -839,7 +839,11 @@ function updateServers(topology: Topology, incomingServerDescription?: ServerDes incomingServerDescription.error instanceof MongoError && incomingServerDescription.error.hasErrorLabel(MongoErrorLabel.ResetPool) ) { - server.s.pool.clear(); + const interruptInUseConnections = incomingServerDescription.error.hasErrorLabel( + MongoErrorLabel.InterruptInUseConnections + ); + + server.s.pool.clear({ interruptInUseConnections }); } else if (incomingServerDescription.error == null) { const newTopologyType = topology.s.description.type; const shouldMarkPoolReady = diff --git a/src/sessions.ts b/src/sessions.ts index 21468aff11..f2b7d5281e 100644 --- a/src/sessions.ts +++ b/src/sessions.ts @@ -537,7 +537,7 @@ export function maybeClearPinnedConnection( ); if (options?.forceClear) { - loadBalancer.s.pool.clear(conn.serviceId); + loadBalancer.s.pool.clear({ serviceId: conn.serviceId }); } } diff --git a/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts b/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts index 66b0b40d28..ac1066a168 100644 --- a/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts +++ b/test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts @@ -12,34 +12,43 @@ const LB_SKIP_TESTS: SkipDescription[] = [ 'clearing a paused pool emits no events', 'after clear, cannot check out connections until pool ready', 'readying a ready pool emits no events', - 'error during minPoolSize population clears pool' + 'error during minPoolSize population clears pool', + 'Connections MUST be interrupted as soon as possible (interruptInUseConnections=true)' ].map(description => ({ description, skipIfCondition: 'loadBalanced', skipReason: 'cannot run against a load balanced environment' })); -const INTERRUPT_IN_USE_CONNECTIONS_TESTS: SkipDescription[] = [ - 'Connections MUST be interrupted as soon as possible (interruptInUseConnections=true)', - 'Pool clear SHOULD schedule the next background thread run immediately (interruptInUseConnections: false)', - 'clear with interruptInUseConnections = true closes pending connections' -].map(description => ({ - description, - skipIfCondition: 'always', - skipReason: 'TODO(NODE-4691): cancel inflight operations when heartbeat fails' -})); +const INTERRUPT_IN_USE_SKIPPED_TESTS: SkipDescription[] = [ + { + description: 'clear with interruptInUseConnections = true closes pending connections', + skipIfCondition: 'always', + skipReason: 'TODO(NODE-4784): track and kill pending connections' + }, + { + description: + 'Pool clear SHOULD schedule the next background thread run immediately (interruptInUseConnections: false)', + skipIfCondition: 'always', + skipReason: + 'NodeJS does not have a background thread responsible for managing connections, and so already checked in connections are not pruned when in-use connections are interrupted.' + } +]; describe('Connection Monitoring and Pooling Spec Tests (Integration)', function () { const tests: CmapTest[] = loadSpecTests('connection-monitoring-and-pooling'); runCmapTestSuite(tests, { - testsToSkip: LB_SKIP_TESTS.concat([ - { - description: 'waiting on maxConnecting is limited by WaitQueueTimeoutMS', - skipIfCondition: 'always', - skipReason: - 'not applicable: waitQueueTimeoutMS limits connection establishment time in our driver' - } - ]).concat(INTERRUPT_IN_USE_CONNECTIONS_TESTS) + testsToSkip: LB_SKIP_TESTS.concat( + [ + { + description: 'waiting on maxConnecting is limited by WaitQueueTimeoutMS', + skipIfCondition: 'always', + skipReason: + 'not applicable: waitQueueTimeoutMS limits connection establishment time in our driver' + } + ], + INTERRUPT_IN_USE_SKIPPED_TESTS + ) }); }); diff --git a/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts b/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts index a990353aca..74fc67efc6 100644 --- a/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts +++ b/test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.spec.test.ts @@ -4,16 +4,5 @@ import { loadSpecTests } from '../../spec'; import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner'; describe('SDAM Unified Tests', function () { - const sdamPoolClearedTests = [ - 'Connection pool clear uses interruptInUseConnections=true after monitor timeout', - 'Error returned from connection pool clear with interruptInUseConnections=true is retryable', - 'Error returned from connection pool clear with interruptInUseConnections=true is retryable for write' - ]; - runUnifiedSuite( - loadSpecTests(path.join('server-discovery-and-monitoring', 'unified')), - ({ description }) => - sdamPoolClearedTests.includes(description) - ? 'TODO(NODE-4691): interrupt in-use operations on heartbeat failure' - : false - ); + runUnifiedSuite(loadSpecTests(path.join('server-discovery-and-monitoring', 'unified'))); }); diff --git a/test/tools/cmap_spec_runner.ts b/test/tools/cmap_spec_runner.ts index d5384fc287..634e732f3e 100644 --- a/test/tools/cmap_spec_runner.ts +++ b/test/tools/cmap_spec_runner.ts @@ -197,10 +197,8 @@ const getTestOpDefinitions = (threadContext: ThreadContext) => ({ return threadContext.pool.checkIn(connection); }, - // eslint-disable-next-line @typescript-eslint/no-unused-vars - clear: function (interruptInUseConnections: boolean) { - // TODO(NODE-4619): pass interruptInUseConnections into clear pool method - return threadContext.pool.clear(); + clear: function ({ interruptInUseConnections }: { interruptInUseConnections: boolean }) { + return threadContext.pool.clear({ interruptInUseConnections }); }, close: async function () { return await promisify(ConnectionPool.prototype.close).call(threadContext.pool); diff --git a/test/unit/sdam/server.test.ts b/test/unit/sdam/server.test.ts index cf810d8754..188c290ba4 100644 --- a/test/unit/sdam/server.test.ts +++ b/test/unit/sdam/server.test.ts @@ -105,7 +105,9 @@ describe('Server', () => { expect(newDescription).to.have.nested.property('[0].type', ServerType.Unknown); } else { expect(newDescription).to.be.undefined; - expect(server.s.pool.clear).to.have.been.calledOnceWith(connection!.serviceId); + expect(server.s.pool.clear).to.have.been.calledOnceWith({ + serviceId: connection!.serviceId + }); } });