Skip to content

Commit

Permalink
feat(NODE-4385): add cmap pool pausing functionality (#3321)
Browse files Browse the repository at this point in the history
  • Loading branch information
dariakp committed Sep 7, 2022
1 parent f4702f4 commit 335ee55
Show file tree
Hide file tree
Showing 25 changed files with 368 additions and 149 deletions.
131 changes: 82 additions & 49 deletions src/cmap/connection_pool.ts
Expand Up @@ -13,6 +13,7 @@ import {
CONNECTION_POOL_CLEARED,
CONNECTION_POOL_CLOSED,
CONNECTION_POOL_CREATED,
CONNECTION_POOL_READY,
CONNECTION_READY
} from '../constants';
import { MongoError, MongoInvalidArgumentError, MongoRuntimeError } from '../error';
Expand All @@ -31,9 +32,10 @@ import {
ConnectionPoolClearedEvent,
ConnectionPoolClosedEvent,
ConnectionPoolCreatedEvent,
ConnectionPoolReadyEvent,
ConnectionReadyEvent
} from './connection_pool_events';
import { PoolClosedError, WaitQueueTimeoutError } from './errors';
import { PoolClearedError, PoolClosedError, WaitQueueTimeoutError } from './errors';
import { ConnectionPoolMetrics } from './metrics';

/** @internal */
Expand Down Expand Up @@ -103,6 +105,7 @@ export interface CloseOptions {
/** @public */
export type ConnectionPoolEvents = {
connectionPoolCreated(event: ConnectionPoolCreatedEvent): void;
connectionPoolReady(event: ConnectionPoolReadyEvent): void;
connectionPoolClosed(event: ConnectionPoolClosedEvent): void;
connectionPoolCleared(event: ConnectionPoolClearedEvent): void;
connectionCreated(event: ConnectionCreatedEvent): void;
Expand Down Expand Up @@ -167,6 +170,11 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
* @event
*/
static readonly CONNECTION_POOL_CLEARED = CONNECTION_POOL_CLEARED;
/**
* Emitted each time the connection pool is marked ready
* @event
*/
static readonly CONNECTION_POOL_READY = CONNECTION_POOL_READY;
/**
* Emitted when a connection is created.
* @event
Expand Down Expand Up @@ -242,7 +250,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {

process.nextTick(() => {
this.emit(ConnectionPool.CONNECTION_POOL_CREATED, new ConnectionPoolCreatedEvent(this));
this.ensureMinPoolSize();
});
}

Expand Down Expand Up @@ -308,7 +315,13 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
* Set the pool state to "ready"
*/
ready(): void {
if (this[kPoolState] !== PoolState.paused) {
return;
}
this[kPoolState] = PoolState.ready;
this.emit(ConnectionPool.CONNECTION_POOL_READY, new ConnectionPoolReadyEvent(this));
clearTimeout(this[kMinPoolSizeTimer]);
this.ensureMinPoolSize();
}

/**
Expand All @@ -322,15 +335,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
new ConnectionCheckOutStartedEvent(this)
);

if (this.closed) {
this.emit(
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
new ConnectionCheckOutFailedEvent(this, 'poolClosed')
);
callback(new PoolClosedError(this));
return;
}

const waitQueueMember: WaitQueueMember = { callback };
const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;
if (waitQueueTimeoutMS) {
Expand Down Expand Up @@ -390,26 +394,40 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
* previous generation will eventually be pruned during subsequent checkouts.
*/
clear(serviceId?: ObjectId): void {
if (this.closed) {
return;
}

// handle load balanced case
if (this.loadBalanced && serviceId) {
const sid = serviceId.toHexString();
const generation = this.serviceGenerations.get(sid);
// Only need to worry if the generation exists, since it should
// always be there but typescript needs the check.
if (generation == null) {
// TODO(NODE-3483)
throw new MongoRuntimeError('Service generations are required in load balancer mode.');
} else {
// Increment the generation for the service id.
this.serviceGenerations.set(sid, generation + 1);
}
} else {
this[kGeneration] += 1;
this.emit(
ConnectionPool.CONNECTION_POOL_CLEARED,
new ConnectionPoolClearedEvent(this, serviceId)
);
return;
}

this.emit(
ConnectionPool.CONNECTION_POOL_CLEARED,
new ConnectionPoolClearedEvent(this, serviceId)
);
// handle non load-balanced case
this[kGeneration] += 1;
const alreadyPaused = this[kPoolState] === PoolState.paused;
this[kPoolState] = PoolState.paused;

this.clearMinPoolSizeTimer();
this.processWaitQueue();

if (!alreadyPaused) {
this.emit(ConnectionPool.CONNECTION_POOL_CLEARED, new ConnectionPoolClearedEvent(this));
}
}

/** Close the pool */
Expand All @@ -430,33 +448,15 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
// immediately cancel any in-flight connections
this[kCancellationToken].emit('cancel');

// drain the wait queue
while (this.waitQueueSize) {
const waitQueueMember = this[kWaitQueue].pop();
if (waitQueueMember) {
if (waitQueueMember.timer) {
clearTimeout(waitQueueMember.timer);
}
if (!waitQueueMember[kCancelled]) {
// TODO(NODE-3483): Replace with MongoConnectionPoolClosedError
waitQueueMember.callback(new MongoRuntimeError('Connection pool closed'));
}
}
}

// clear the min pool size timer
const minPoolSizeTimer = this[kMinPoolSizeTimer];
if (minPoolSizeTimer) {
clearTimeout(minPoolSizeTimer);
}

// end the connection counter
if (typeof this[kConnectionCounter].return === 'function') {
this[kConnectionCounter].return(undefined);
}

// mark the pool as closed immediately
this[kPoolState] = PoolState.closed;
this.clearMinPoolSizeTimer();
this.processWaitQueue();

eachAsync<Connection>(
this[kConnections].toArray(),
(conn, cb) => {
Expand Down Expand Up @@ -526,12 +526,19 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
});
}

/** Clear the min pool size timer */
private clearMinPoolSizeTimer(): void {
const minPoolSizeTimer = this[kMinPoolSizeTimer];
if (minPoolSizeTimer) {
clearTimeout(minPoolSizeTimer);
}
}

private destroyConnection(connection: Connection, reason: string) {
this.emit(
ConnectionPool.CONNECTION_CLOSED,
new ConnectionClosedEvent(this, connection, reason)
);

// destroy the connection
process.nextTick(() => connection.destroy());
}
Expand Down Expand Up @@ -580,14 +587,16 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
connect(connectOptions, (err, connection) => {
if (err || !connection) {
this[kLogger].debug(`connection attempt failed with error [${JSON.stringify(err)}]`);
callback(err);
this[kPending]--;
callback(err ?? new MongoRuntimeError('Connection creation failed without error'));
return;
}

// The pool might have closed since we started trying to create a connection
if (this.closed) {
if (this[kPoolState] !== PoolState.ready) {
this[kPending]--;
connection.destroy({ force: true });
callback(this.closed ? new PoolClosedError(this) : new PoolClearedError(this));
return;
}

Expand Down Expand Up @@ -616,17 +625,25 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
connection.markAvailable();
this.emit(ConnectionPool.CONNECTION_READY, new ConnectionReadyEvent(this, connection));

this[kPending]--;
callback(undefined, connection);
return;
});
}

private ensureMinPoolSize() {
const minPoolSize = this.options.minPoolSize;
if (this.closed || minPoolSize === 0) {
if (this[kPoolState] !== PoolState.ready || minPoolSize === 0) {
return;
}

for (let i = 0; i < this[kConnections].length; i++) {
const connection = this[kConnections].peekAt(i);
if (connection && this.connectionIsPerished(connection)) {
this[kConnections].removeOne(i);
}
}

if (
this.totalConnectionCount < minPoolSize &&
this.pendingConnectionCount < this.options.maxConnecting
Expand All @@ -635,23 +652,25 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
// connection permits because that potentially delays the availability of
// the connection to a checkout request
this.createConnection((err, connection) => {
this[kPending]--;
if (!err && connection) {
this[kConnections].push(connection);
process.nextTick(() => this.processWaitQueue());
}
this[kMinPoolSizeTimer] = setTimeout(() => this.ensureMinPoolSize(), 10);
if (this[kPoolState] === PoolState.ready) {
clearTimeout(this[kMinPoolSizeTimer]);
this[kMinPoolSizeTimer] = setTimeout(() => this.ensureMinPoolSize(), 10);
}
});
} else {
clearTimeout(this[kMinPoolSizeTimer]);
this[kMinPoolSizeTimer] = setTimeout(() => this.ensureMinPoolSize(), 100);
}
}

private processWaitQueue() {
if (this.closed || this[kProcessingWaitQueue]) {
if (this[kProcessingWaitQueue]) {
return;
}

this[kProcessingWaitQueue] = true;

while (this.waitQueueSize) {
Expand All @@ -666,6 +685,21 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
continue;
}

if (this[kPoolState] !== PoolState.ready) {
const reason = this.closed ? 'poolClosed' : 'connectionError';
const error = this.closed ? new PoolClosedError(this) : new PoolClearedError(this);
this.emit(
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
new ConnectionCheckOutFailedEvent(this, reason)
);
if (waitQueueMember.timer) {
clearTimeout(waitQueueMember.timer);
}
this[kWaitQueue].shift();
waitQueueMember.callback(error);
continue;
}

if (!this.availableConnectionCount) {
break;
}
Expand Down Expand Up @@ -701,7 +735,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
continue;
}
this.createConnection((err, connection) => {
this[kPending]--;
if (waitQueueMember[kCancelled]) {
if (!err && connection) {
this[kConnections].push(connection);
Expand All @@ -710,7 +743,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
if (err) {
this.emit(
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
new ConnectionCheckOutFailedEvent(this, err)
new ConnectionCheckOutFailedEvent(this, 'connectionError')
);
} else if (connection) {
this[kCheckedOut]++;
Expand Down
12 changes: 12 additions & 0 deletions src/cmap/connection_pool_events.ts
Expand Up @@ -37,6 +37,18 @@ export class ConnectionPoolCreatedEvent extends ConnectionPoolMonitoringEvent {
}
}

/**
* An event published when a connection pool is ready
* @public
* @category Event
*/
export class ConnectionPoolReadyEvent extends ConnectionPoolMonitoringEvent {
/** @internal */
constructor(pool: ConnectionPool) {
super(pool);
}
}

/**
* An event published when a connection pool is closed
* @public
Expand Down
23 changes: 22 additions & 1 deletion src/cmap/errors.ts
@@ -1,4 +1,4 @@
import { MongoDriverError } from '../error';
import { MongoDriverError, MongoNetworkError } from '../error';
import type { ConnectionPool } from './connection_pool';

/**
Expand All @@ -19,6 +19,27 @@ export class PoolClosedError extends MongoDriverError {
}
}

/**
* An error indicating a connection pool is currently paused
* @category Error
*/
export class PoolClearedError extends MongoNetworkError {
// TODO(NODE-3144): needs to extend RetryableError or be marked retryable in some other way per spec
/** The address of the connection pool */
address: string;

constructor(pool: ConnectionPool) {
// TODO(NODE-3135): pass in original pool-clearing error and use in message
// "failed with: <original error which cleared the pool>"
super(`Connection pool for ${pool.address} was cleared because another operation failed`);
this.address = pool.address;
}

override get name(): string {
return 'MongoPoolClearedError';
}
}

/**
* An error thrown when a request to check out a connection times out
* @category Error
Expand Down
6 changes: 4 additions & 2 deletions src/constants.ts
Expand Up @@ -26,6 +26,7 @@ export const TOPOLOGY_DESCRIPTION_CHANGED = 'topologyDescriptionChanged' as cons
export const CONNECTION_POOL_CREATED = 'connectionPoolCreated' as const;
export const CONNECTION_POOL_CLOSED = 'connectionPoolClosed' as const;
export const CONNECTION_POOL_CLEARED = 'connectionPoolCleared' as const;
export const CONNECTION_POOL_READY = 'connectionPoolReady' as const;
export const CONNECTION_CREATED = 'connectionCreated' as const;
export const CONNECTION_READY = 'connectionReady' as const;
export const CONNECTION_CLOSED = 'connectionClosed' as const;
Expand Down Expand Up @@ -57,15 +58,16 @@ export const HEARTBEAT_EVENTS = Object.freeze([
/** @public */
export const CMAP_EVENTS = Object.freeze([
CONNECTION_POOL_CREATED,
CONNECTION_POOL_READY,
CONNECTION_POOL_CLEARED,
CONNECTION_POOL_CLOSED,
CONNECTION_CREATED,
CONNECTION_READY,
CONNECTION_CLOSED,
CONNECTION_CHECK_OUT_STARTED,
CONNECTION_CHECK_OUT_FAILED,
CONNECTION_CHECKED_OUT,
CONNECTION_CHECKED_IN,
CONNECTION_POOL_CLEARED
CONNECTION_CHECKED_IN
] as const);

/** @public */
Expand Down

0 comments on commit 335ee55

Please sign in to comment.