Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-4691): interrupt in-flight operations on heartbeat failure #3457

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
58 changes: 50 additions & 8 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ import {
ConnectionPoolReadyEvent,
ConnectionReadyEvent
} from './connection_pool_events';
import { PoolClearedError, PoolClosedError, WaitQueueTimeoutError } from './errors';
import {
PoolClearedError,
PoolClearedOnNetworkError,
PoolClosedError,
WaitQueueTimeoutError
} from './errors';
import { ConnectionPoolMetrics } from './metrics';

/** @internal */
Expand Down Expand Up @@ -382,6 +387,9 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
* @param connection - The connection to check in
*/
checkIn(connection: Connection): void {
if (!this[kCheckedOut].has(connection)) {
return;
}
const poolClosed = this.closed;
const stale = this.connectionIsStale(connection);
const willDestroy = !!(poolClosed || stale || connection.closed);
Expand All @@ -408,13 +416,19 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
* Pool reset is handled by incrementing the pool's generation count. Any existing connection of a
* previous generation will eventually be pruned during subsequent checkouts.
*/
clear(serviceId?: ObjectId): void {
clear(options: { serviceId?: ObjectId; interruptInUseConnections?: boolean } = {}): void {
if (this.closed) {
return;
}

// handle load balanced case
if (this.loadBalanced && serviceId) {
if (this.loadBalanced) {
const { serviceId } = options;
if (!serviceId) {
throw new MongoRuntimeError(
'ConnectionPool.clear() called in load balanced mode with no serviceId.'
);
}
const sid = serviceId.toHexString();
const generation = this.serviceGenerations.get(sid);
// Only need to worry if the generation exists, since it should
Expand All @@ -431,19 +445,42 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
);
return;
}

// handle non load-balanced case
const interruptInUseConnections = options.interruptInUseConnections ?? false;
const oldGeneration = this[kGeneration];
this[kGeneration] += 1;
const alreadyPaused = this[kPoolState] === PoolState.paused;
this[kPoolState] = PoolState.paused;

this.clearMinPoolSizeTimer();
if (!alreadyPaused) {
this.emit(ConnectionPool.CONNECTION_POOL_CLEARED, new ConnectionPoolClearedEvent(this));
this.emit(
ConnectionPool.CONNECTION_POOL_CLEARED,
new ConnectionPoolClearedEvent(this, { interruptInUseConnections })
);
}

if (interruptInUseConnections) {
process.nextTick(() => this.interruptInUseConnections(oldGeneration));
}

this.processWaitQueue();
}

/**
* Closes all stale in-use connections in the pool with a resumable PoolClearedOnNetworkError.
*
* Only connections where `connection.generation <= minGeneration` are killed.
*/
private interruptInUseConnections(minGeneration: number) {
for (const connection of this[kCheckedOut]) {
if (connection.generation <= minGeneration) {
this.checkIn(connection);
connection.onError(new PoolClearedOnNetworkError(this));
}
}
}

/** Close the pool */
close(callback: Callback<void>): void;
close(options: CloseOptions, callback: Callback<void>): void;
Expand Down Expand Up @@ -572,7 +609,12 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
return !!(this.options.maxIdleTimeMS && connection.idleTime > this.options.maxIdleTimeMS);
}

private connectionIsPerished(connection: Connection) {
/**
* Destroys a connection if the connection is perished.
*
* @returns `true` if the connection was destroyed, `false` otherwise.
*/
private destroyConnectionIfPerished(connection: Connection): boolean {
const isStale = this.connectionIsStale(connection);
const isIdle = this.connectionIsIdle(connection);
if (!isStale && !isIdle && !connection.closed) {
Expand Down Expand Up @@ -658,7 +700,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
return;
}

this[kConnections].prune(connection => this.connectionIsPerished(connection));
this[kConnections].prune(connection => this.destroyConnectionIfPerished(connection));

if (
this.totalConnectionCount < minPoolSize &&
Expand Down Expand Up @@ -734,7 +776,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
break;
}

if (!this.connectionIsPerished(connection)) {
if (!this.destroyConnectionIfPerished(connection)) {
this[kCheckedOut].add(connection);
this.emit(
ConnectionPool.CONNECTION_CHECKED_OUT,
Expand Down
27 changes: 22 additions & 5 deletions src/cmap/errors.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { MongoDriverError, MongoNetworkError } from '../error';
import { MongoDriverError, MongoErrorLabel, MongoNetworkError } from '../error';
import type { ConnectionPool } from './connection_pool';

/**
Expand Down Expand Up @@ -27,18 +27,35 @@ export class PoolClearedError extends MongoNetworkError {
/** The address of the connection pool */
address: string;

constructor(pool: ConnectionPool) {
super(
`Connection pool for ${pool.address} was cleared because another operation failed with: "${pool.serverError?.message}"`
);
constructor(pool: ConnectionPool, message?: string) {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
const errorMessage = message
? message
: `Connection pool for ${pool.address} was cleared because another operation failed with: "${pool.serverError?.message}"`;
super(errorMessage);
this.address = pool.address;

this.addErrorLabel(MongoErrorLabel.RetryableWriteError);
}

override get name(): string {
return 'MongoPoolClearedError';
}
}

/**
* An error indicating that a connection pool has been cleared after the monitor for that server timed out.
* @category Error
*/
export class PoolClearedOnNetworkError extends PoolClearedError {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
constructor(pool: ConnectionPool) {
super(pool, `Connection to ${pool.address} interrupted due to server monitor timeout`);
}

override get name(): string {
return 'PoolClearedOnNetworkError';
}
}

/**
* An error thrown when a request to check out a connection times out
* @category Error
Expand Down
1 change: 1 addition & 0 deletions src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ export const MongoErrorLabel = Object.freeze({
ResumableChangeStreamError: 'ResumableChangeStreamError',
HandshakeError: 'HandshakeError',
ResetPool: 'ResetPool',
InterruptInUseConnections: 'InterruptInUseConnections',
NoWritesPerformed: 'NoWritesPerformed'
} as const);

Expand Down
5 changes: 4 additions & 1 deletion src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Document, Long } from '../bson';
import { connect } from '../cmap/connect';
import { Connection, ConnectionOptions } from '../cmap/connection';
import { LEGACY_HELLO_COMMAND } from '../constants';
import { MongoError, MongoErrorLabel } from '../error';
import { MongoError, MongoErrorLabel, MongoNetworkTimeoutError } from '../error';
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
import type { Callback } from '../utils';
import { calculateDurationInMs, EventEmitterWithState, makeStateMachine, now, ns } from '../utils';
Expand Down Expand Up @@ -221,6 +221,9 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {

const error = !(err instanceof MongoError) ? new MongoError(err) : err;
error.addErrorLabel(MongoErrorLabel.ResetPool);
if (error instanceof MongoNetworkTimeoutError) {
error.addErrorLabel(MongoErrorLabel.InterruptInUseConnections);
}

monitor.emit('resetServer', error);
callback(err);
Expand Down
6 changes: 2 additions & 4 deletions src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,6 @@ export class Server extends TypedEventEmitter<ServerEvents> {
}
if (!(err instanceof PoolClearedError)) {
this.handleError(err);
} else {
err.addErrorLabel(MongoErrorLabel.RetryableWriteError);
}
return cb(err);
}
Expand Down Expand Up @@ -400,14 +398,14 @@ export class Server extends TypedEventEmitter<ServerEvents> {
error.addErrorLabel(MongoErrorLabel.ResetPool);
markServerUnknown(this, error);
} else if (connection) {
this.s.pool.clear(connection.serviceId);
this.s.pool.clear({ serviceId: connection.serviceId });
}
} else {
if (isSDAMUnrecoverableError(error)) {
if (shouldHandleStateChangeError(this, error)) {
const shouldClearPool = maxWireVersion(this) <= 7 || isNodeShuttingDownError(error);
if (this.loadBalanced && connection && shouldClearPool) {
this.s.pool.clear(connection.serviceId);
this.s.pool.clear({ serviceId: connection.serviceId });
}

if (!this.loadBalanced) {
Expand Down
6 changes: 5 additions & 1 deletion src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,11 @@ function updateServers(topology: Topology, incomingServerDescription?: ServerDes
incomingServerDescription.error instanceof MongoError &&
incomingServerDescription.error.hasErrorLabel(MongoErrorLabel.ResetPool)
) {
server.s.pool.clear();
const interruptInUseConnections = incomingServerDescription.error.hasErrorLabel(
MongoErrorLabel.InterruptInUseConnections
);

server.s.pool.clear({ interruptInUseConnections });
} else if (incomingServerDescription.error == null) {
const newTopologyType = topology.s.description.type;
const shouldMarkPoolReady =
Expand Down
2 changes: 1 addition & 1 deletion src/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ export function maybeClearPinnedConnection(
);

if (options?.forceClear) {
loadBalancer.s.pool.clear(conn.serviceId);
loadBalancer.s.pool.clear({ serviceId: conn.serviceId });
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,43 @@ const LB_SKIP_TESTS: SkipDescription[] = [
'clearing a paused pool emits no events',
'after clear, cannot check out connections until pool ready',
'readying a ready pool emits no events',
'error during minPoolSize population clears pool'
'error during minPoolSize population clears pool',
'Connections MUST be interrupted as soon as possible (interruptInUseConnections=true)'
].map(description => ({
description,
skipIfCondition: 'loadBalanced',
skipReason: 'cannot run against a load balanced environment'
}));

const INTERRUPT_IN_USE_CONNECTIONS_TESTS: SkipDescription[] = [
'Connections MUST be interrupted as soon as possible (interruptInUseConnections=true)',
dariakp marked this conversation as resolved.
Show resolved Hide resolved
'Pool clear SHOULD schedule the next background thread run immediately (interruptInUseConnections: false)',
'clear with interruptInUseConnections = true closes pending connections'
].map(description => ({
description,
skipIfCondition: 'always',
skipReason: 'TODO(NODE-4691): cancel inflight operations when heartbeat fails'
}));
const INTERRUPT_IN_USE_SKIPPED_TESTS: SkipDescription[] = [
{
description: 'clear with interruptInUseConnections = true closes pending connections',
skipIfCondition: 'always',
skipReason: 'TODO(NODE-4784): track and kill pending connections'
},
{
description:
'Pool clear SHOULD schedule the next background thread run immediately (interruptInUseConnections: false)',
skipIfCondition: 'always',
skipReason:
'NodeJS does not have a background thread responsible for managing connections, and so already checked in connections are not pruned when in-use connections are interrupted.'
}
];

describe('Connection Monitoring and Pooling Spec Tests (Integration)', function () {
const tests: CmapTest[] = loadSpecTests('connection-monitoring-and-pooling');

runCmapTestSuite(tests, {
testsToSkip: LB_SKIP_TESTS.concat([
{
description: 'waiting on maxConnecting is limited by WaitQueueTimeoutMS',
skipIfCondition: 'always',
skipReason:
'not applicable: waitQueueTimeoutMS limits connection establishment time in our driver'
}
]).concat(INTERRUPT_IN_USE_CONNECTIONS_TESTS)
testsToSkip: LB_SKIP_TESTS.concat(
[
{
description: 'waiting on maxConnecting is limited by WaitQueueTimeoutMS',
skipIfCondition: 'always',
skipReason:
'not applicable: waitQueueTimeoutMS limits connection establishment time in our driver'
}
],
INTERRUPT_IN_USE_SKIPPED_TESTS
)
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,5 @@ import { loadSpecTests } from '../../spec';
import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner';

describe('SDAM Unified Tests', function () {
const sdamPoolClearedTests = [
'Connection pool clear uses interruptInUseConnections=true after monitor timeout',
'Error returned from connection pool clear with interruptInUseConnections=true is retryable',
'Error returned from connection pool clear with interruptInUseConnections=true is retryable for write'
];
runUnifiedSuite(
loadSpecTests(path.join('server-discovery-and-monitoring', 'unified')),
({ description }) =>
sdamPoolClearedTests.includes(description)
? 'TODO(NODE-4691): interrupt in-use operations on heartbeat failure'
: false
);
runUnifiedSuite(loadSpecTests(path.join('server-discovery-and-monitoring', 'unified')));
});
6 changes: 2 additions & 4 deletions test/tools/cmap_spec_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,8 @@ const getTestOpDefinitions = (threadContext: ThreadContext) => ({

return threadContext.pool.checkIn(connection);
},
// eslint-disable-next-line @typescript-eslint/no-unused-vars
clear: function (interruptInUseConnections: boolean) {
// TODO(NODE-4619): pass interruptInUseConnections into clear pool method
return threadContext.pool.clear();
clear: function ({ interruptInUseConnections }: { interruptInUseConnections: boolean }) {
return threadContext.pool.clear({ interruptInUseConnections });
},
close: async function () {
return await promisify(ConnectionPool.prototype.close).call(threadContext.pool);
Expand Down
4 changes: 3 additions & 1 deletion test/unit/sdam/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ describe('Server', () => {
expect(newDescription).to.have.nested.property('[0].type', ServerType.Unknown);
} else {
expect(newDescription).to.be.undefined;
expect(server.s.pool.clear).to.have.been.calledOnceWith(connection!.serviceId);
expect(server.s.pool.clear).to.have.been.calledOnceWith({
serviceId: connection!.serviceId
});
}
});

Expand Down