Skip to content

Commit

Permalink
feat(NODE-4691): interrupt in-flight operations on heartbeat failure (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
baileympearson committed Nov 28, 2022
1 parent 73e92ce commit e641bd4
Show file tree
Hide file tree
Showing 11 changed files with 118 additions and 55 deletions.
58 changes: 50 additions & 8 deletions src/cmap/connection_pool.ts
Expand Up @@ -41,7 +41,12 @@ import {
ConnectionPoolReadyEvent,
ConnectionReadyEvent
} from './connection_pool_events';
import { PoolClearedError, PoolClosedError, WaitQueueTimeoutError } from './errors';
import {
PoolClearedError,
PoolClearedOnNetworkError,
PoolClosedError,
WaitQueueTimeoutError
} from './errors';
import { ConnectionPoolMetrics } from './metrics';

/** @internal */
Expand Down Expand Up @@ -382,6 +387,9 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
* @param connection - The connection to check in
*/
checkIn(connection: Connection): void {
if (!this[kCheckedOut].has(connection)) {
return;
}
const poolClosed = this.closed;
const stale = this.connectionIsStale(connection);
const willDestroy = !!(poolClosed || stale || connection.closed);
Expand All @@ -408,13 +416,19 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
* Pool reset is handled by incrementing the pool's generation count. Any existing connection of a
* previous generation will eventually be pruned during subsequent checkouts.
*/
clear(serviceId?: ObjectId): void {
clear(options: { serviceId?: ObjectId; interruptInUseConnections?: boolean } = {}): void {
if (this.closed) {
return;
}

// handle load balanced case
if (this.loadBalanced && serviceId) {
if (this.loadBalanced) {
const { serviceId } = options;
if (!serviceId) {
throw new MongoRuntimeError(
'ConnectionPool.clear() called in load balanced mode with no serviceId.'
);
}
const sid = serviceId.toHexString();
const generation = this.serviceGenerations.get(sid);
// Only need to worry if the generation exists, since it should
Expand All @@ -431,19 +445,42 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
);
return;
}

// handle non load-balanced case
const interruptInUseConnections = options.interruptInUseConnections ?? false;
const oldGeneration = this[kGeneration];
this[kGeneration] += 1;
const alreadyPaused = this[kPoolState] === PoolState.paused;
this[kPoolState] = PoolState.paused;

this.clearMinPoolSizeTimer();
if (!alreadyPaused) {
this.emit(ConnectionPool.CONNECTION_POOL_CLEARED, new ConnectionPoolClearedEvent(this));
this.emit(
ConnectionPool.CONNECTION_POOL_CLEARED,
new ConnectionPoolClearedEvent(this, { interruptInUseConnections })
);
}

if (interruptInUseConnections) {
process.nextTick(() => this.interruptInUseConnections(oldGeneration));
}

this.processWaitQueue();
}

/**
* Closes all stale in-use connections in the pool with a resumable PoolClearedOnNetworkError.
*
* Only connections where `connection.generation <= minGeneration` are killed.
*/
private interruptInUseConnections(minGeneration: number) {
for (const connection of this[kCheckedOut]) {
if (connection.generation <= minGeneration) {
this.checkIn(connection);
connection.onError(new PoolClearedOnNetworkError(this));
}
}
}

/** Close the pool */
close(callback: Callback<void>): void;
close(options: CloseOptions, callback: Callback<void>): void;
Expand Down Expand Up @@ -572,7 +609,12 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
return !!(this.options.maxIdleTimeMS && connection.idleTime > this.options.maxIdleTimeMS);
}

private connectionIsPerished(connection: Connection) {
/**
* Destroys a connection if the connection is perished.
*
* @returns `true` if the connection was destroyed, `false` otherwise.
*/
private destroyConnectionIfPerished(connection: Connection): boolean {
const isStale = this.connectionIsStale(connection);
const isIdle = this.connectionIsIdle(connection);
if (!isStale && !isIdle && !connection.closed) {
Expand Down Expand Up @@ -658,7 +700,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
return;
}

this[kConnections].prune(connection => this.connectionIsPerished(connection));
this[kConnections].prune(connection => this.destroyConnectionIfPerished(connection));

if (
this.totalConnectionCount < minPoolSize &&
Expand Down Expand Up @@ -734,7 +776,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
break;
}

if (!this.connectionIsPerished(connection)) {
if (!this.destroyConnectionIfPerished(connection)) {
this[kCheckedOut].add(connection);
this.emit(
ConnectionPool.CONNECTION_CHECKED_OUT,
Expand Down
27 changes: 22 additions & 5 deletions src/cmap/errors.ts
@@ -1,4 +1,4 @@
import { MongoDriverError, MongoNetworkError } from '../error';
import { MongoDriverError, MongoErrorLabel, MongoNetworkError } from '../error';
import type { ConnectionPool } from './connection_pool';

/**
Expand Down Expand Up @@ -27,18 +27,35 @@ export class PoolClearedError extends MongoNetworkError {
/** The address of the connection pool */
address: string;

constructor(pool: ConnectionPool) {
super(
`Connection pool for ${pool.address} was cleared because another operation failed with: "${pool.serverError?.message}"`
);
constructor(pool: ConnectionPool, message?: string) {
const errorMessage = message
? message
: `Connection pool for ${pool.address} was cleared because another operation failed with: "${pool.serverError?.message}"`;
super(errorMessage);
this.address = pool.address;

this.addErrorLabel(MongoErrorLabel.RetryableWriteError);
}

override get name(): string {
return 'MongoPoolClearedError';
}
}

/**
* An error indicating that a connection pool has been cleared after the monitor for that server timed out.
* @category Error
*/
export class PoolClearedOnNetworkError extends PoolClearedError {
constructor(pool: ConnectionPool) {
super(pool, `Connection to ${pool.address} interrupted due to server monitor timeout`);
}

override get name(): string {
return 'PoolClearedOnNetworkError';
}
}

/**
* An error thrown when a request to check out a connection times out
* @category Error
Expand Down
1 change: 1 addition & 0 deletions src/error.ts
Expand Up @@ -91,6 +91,7 @@ export const MongoErrorLabel = Object.freeze({
ResumableChangeStreamError: 'ResumableChangeStreamError',
HandshakeError: 'HandshakeError',
ResetPool: 'ResetPool',
InterruptInUseConnections: 'InterruptInUseConnections',
NoWritesPerformed: 'NoWritesPerformed'
} as const);

Expand Down
5 changes: 4 additions & 1 deletion src/sdam/monitor.ts
Expand Up @@ -4,7 +4,7 @@ import { Document, Long } from '../bson';
import { connect } from '../cmap/connect';
import { Connection, ConnectionOptions } from '../cmap/connection';
import { LEGACY_HELLO_COMMAND } from '../constants';
import { MongoError, MongoErrorLabel } from '../error';
import { MongoError, MongoErrorLabel, MongoNetworkTimeoutError } from '../error';
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
import type { Callback } from '../utils';
import { calculateDurationInMs, EventEmitterWithState, makeStateMachine, now, ns } from '../utils';
Expand Down Expand Up @@ -221,6 +221,9 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {

const error = !(err instanceof MongoError) ? new MongoError(err) : err;
error.addErrorLabel(MongoErrorLabel.ResetPool);
if (error instanceof MongoNetworkTimeoutError) {
error.addErrorLabel(MongoErrorLabel.InterruptInUseConnections);
}

monitor.emit('resetServer', error);
callback(err);
Expand Down
6 changes: 2 additions & 4 deletions src/sdam/server.ts
Expand Up @@ -354,8 +354,6 @@ export class Server extends TypedEventEmitter<ServerEvents> {
}
if (!(err instanceof PoolClearedError)) {
this.handleError(err);
} else {
err.addErrorLabel(MongoErrorLabel.RetryableWriteError);
}
return cb(err);
}
Expand Down Expand Up @@ -400,14 +398,14 @@ export class Server extends TypedEventEmitter<ServerEvents> {
error.addErrorLabel(MongoErrorLabel.ResetPool);
markServerUnknown(this, error);
} else if (connection) {
this.s.pool.clear(connection.serviceId);
this.s.pool.clear({ serviceId: connection.serviceId });
}
} else {
if (isSDAMUnrecoverableError(error)) {
if (shouldHandleStateChangeError(this, error)) {
const shouldClearPool = maxWireVersion(this) <= 7 || isNodeShuttingDownError(error);
if (this.loadBalanced && connection && shouldClearPool) {
this.s.pool.clear(connection.serviceId);
this.s.pool.clear({ serviceId: connection.serviceId });
}

if (!this.loadBalanced) {
Expand Down
6 changes: 5 additions & 1 deletion src/sdam/topology.ts
Expand Up @@ -839,7 +839,11 @@ function updateServers(topology: Topology, incomingServerDescription?: ServerDes
incomingServerDescription.error instanceof MongoError &&
incomingServerDescription.error.hasErrorLabel(MongoErrorLabel.ResetPool)
) {
server.s.pool.clear();
const interruptInUseConnections = incomingServerDescription.error.hasErrorLabel(
MongoErrorLabel.InterruptInUseConnections
);

server.s.pool.clear({ interruptInUseConnections });
} else if (incomingServerDescription.error == null) {
const newTopologyType = topology.s.description.type;
const shouldMarkPoolReady =
Expand Down
2 changes: 1 addition & 1 deletion src/sessions.ts
Expand Up @@ -537,7 +537,7 @@ export function maybeClearPinnedConnection(
);

if (options?.forceClear) {
loadBalancer.s.pool.clear(conn.serviceId);
loadBalancer.s.pool.clear({ serviceId: conn.serviceId });
}
}

Expand Down
Expand Up @@ -12,34 +12,43 @@ const LB_SKIP_TESTS: SkipDescription[] = [
'clearing a paused pool emits no events',
'after clear, cannot check out connections until pool ready',
'readying a ready pool emits no events',
'error during minPoolSize population clears pool'
'error during minPoolSize population clears pool',
'Connections MUST be interrupted as soon as possible (interruptInUseConnections=true)'
].map(description => ({
description,
skipIfCondition: 'loadBalanced',
skipReason: 'cannot run against a load balanced environment'
}));

const INTERRUPT_IN_USE_CONNECTIONS_TESTS: SkipDescription[] = [
'Connections MUST be interrupted as soon as possible (interruptInUseConnections=true)',
'Pool clear SHOULD schedule the next background thread run immediately (interruptInUseConnections: false)',
'clear with interruptInUseConnections = true closes pending connections'
].map(description => ({
description,
skipIfCondition: 'always',
skipReason: 'TODO(NODE-4691): cancel inflight operations when heartbeat fails'
}));
const INTERRUPT_IN_USE_SKIPPED_TESTS: SkipDescription[] = [
{
description: 'clear with interruptInUseConnections = true closes pending connections',
skipIfCondition: 'always',
skipReason: 'TODO(NODE-4784): track and kill pending connections'
},
{
description:
'Pool clear SHOULD schedule the next background thread run immediately (interruptInUseConnections: false)',
skipIfCondition: 'always',
skipReason:
'NodeJS does not have a background thread responsible for managing connections, and so already checked in connections are not pruned when in-use connections are interrupted.'
}
];

describe('Connection Monitoring and Pooling Spec Tests (Integration)', function () {
const tests: CmapTest[] = loadSpecTests('connection-monitoring-and-pooling');

runCmapTestSuite(tests, {
testsToSkip: LB_SKIP_TESTS.concat([
{
description: 'waiting on maxConnecting is limited by WaitQueueTimeoutMS',
skipIfCondition: 'always',
skipReason:
'not applicable: waitQueueTimeoutMS limits connection establishment time in our driver'
}
]).concat(INTERRUPT_IN_USE_CONNECTIONS_TESTS)
testsToSkip: LB_SKIP_TESTS.concat(
[
{
description: 'waiting on maxConnecting is limited by WaitQueueTimeoutMS',
skipIfCondition: 'always',
skipReason:
'not applicable: waitQueueTimeoutMS limits connection establishment time in our driver'
}
],
INTERRUPT_IN_USE_SKIPPED_TESTS
)
});
});
Expand Up @@ -4,16 +4,5 @@ import { loadSpecTests } from '../../spec';
import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner';

describe('SDAM Unified Tests', function () {
const sdamPoolClearedTests = [
'Connection pool clear uses interruptInUseConnections=true after monitor timeout',
'Error returned from connection pool clear with interruptInUseConnections=true is retryable',
'Error returned from connection pool clear with interruptInUseConnections=true is retryable for write'
];
runUnifiedSuite(
loadSpecTests(path.join('server-discovery-and-monitoring', 'unified')),
({ description }) =>
sdamPoolClearedTests.includes(description)
? 'TODO(NODE-4691): interrupt in-use operations on heartbeat failure'
: false
);
runUnifiedSuite(loadSpecTests(path.join('server-discovery-and-monitoring', 'unified')));
});
6 changes: 2 additions & 4 deletions test/tools/cmap_spec_runner.ts
Expand Up @@ -197,10 +197,8 @@ const getTestOpDefinitions = (threadContext: ThreadContext) => ({

return threadContext.pool.checkIn(connection);
},
// eslint-disable-next-line @typescript-eslint/no-unused-vars
clear: function (interruptInUseConnections: boolean) {
// TODO(NODE-4619): pass interruptInUseConnections into clear pool method
return threadContext.pool.clear();
clear: function ({ interruptInUseConnections }: { interruptInUseConnections: boolean }) {
return threadContext.pool.clear({ interruptInUseConnections });
},
close: async function () {
return await promisify(ConnectionPool.prototype.close).call(threadContext.pool);
Expand Down
4 changes: 3 additions & 1 deletion test/unit/sdam/server.test.ts
Expand Up @@ -105,7 +105,9 @@ describe('Server', () => {
expect(newDescription).to.have.nested.property('[0].type', ServerType.Unknown);
} else {
expect(newDescription).to.be.undefined;
expect(server.s.pool.clear).to.have.been.calledOnceWith(connection!.serviceId);
expect(server.s.pool.clear).to.have.been.calledOnceWith({
serviceId: connection!.serviceId
});
}
});

Expand Down

0 comments on commit e641bd4

Please sign in to comment.