Skip to content

Commit

Permalink
refactor(NODE-2993): align internal cmap implementation with spec (#3248
Browse files Browse the repository at this point in the history
)
  • Loading branch information
dariakp committed May 17, 2022
1 parent 48e0e6e commit e1e4377
Show file tree
Hide file tree
Showing 7 changed files with 378 additions and 123 deletions.
57 changes: 31 additions & 26 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ const kLogger = Symbol('logger');
/** @internal */
const kConnections = Symbol('connections');
/** @internal */
const kPermits = Symbol('permits');
const kPending = Symbol('pending');
/** @internal */
const kCheckedOut = Symbol('checkedOut');
/** @internal */
const kMinPoolSizeTimer = Symbol('minPoolSizeTimer');
/** @internal */
Expand All @@ -57,8 +59,6 @@ const kCancelled = Symbol('cancelled');
/** @internal */
const kMetrics = Symbol('metrics');
/** @internal */
const kCheckedOut = Symbol('checkedOut');
/** @internal */
const kProcessingWaitQueue = Symbol('processingWaitQueue');

/** @public */
Expand Down Expand Up @@ -112,11 +112,10 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
[kLogger]: Logger;
/** @internal */
[kConnections]: Denque<Connection>;
/**
* An integer expressing how many total connections are permitted
* @internal
*/
[kPermits]: number;
/** @internal */
[kPending]: number;
/** @internal */
[kCheckedOut]: number;
/** @internal */
[kMinPoolSizeTimer]?: NodeJS.Timeout;
/**
Expand All @@ -137,8 +136,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
/** @internal */
[kMetrics]: ConnectionPoolMetrics;
/** @internal */
[kCheckedOut]: number;
/** @internal */
[kProcessingWaitQueue]: boolean;

/**
Expand Down Expand Up @@ -216,7 +213,8 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {

this[kLogger] = new Logger('ConnectionPool');
this[kConnections] = new Denque();
this[kPermits] = this.options.maxPoolSize;
this[kPending] = 0;
this[kCheckedOut] = 0;
this[kMinPoolSizeTimer] = undefined;
this[kGeneration] = 0;
this[kServiceGenerations] = new Map();
Expand All @@ -225,7 +223,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
this[kCancellationToken].setMaxListeners(Infinity);
this[kWaitQueue] = new Denque();
this[kMetrics] = new ConnectionPoolMetrics();
this[kCheckedOut] = 0;
this[kProcessingWaitQueue] = false;

process.nextTick(() => {
Expand All @@ -244,16 +241,26 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
return this[kGeneration];
}

/** An integer expressing how many total connections (active + in use) the pool currently has */
/** An integer expressing how many total connections (available + pending + in use) the pool currently has */
get totalConnectionCount(): number {
return this[kConnections].length + (this.options.maxPoolSize - this[kPermits]);
return (
this.availableConnectionCount + this.pendingConnectionCount + this.currentCheckedOutCount
);
}

/** An integer expressing how many connections are currently available in the pool. */
get availableConnectionCount(): number {
return this[kConnections].length;
}

get pendingConnectionCount(): number {
return this[kPending];
}

get currentCheckedOutCount(): number {
return this[kCheckedOut];
}

get waitQueueSize(): number {
return this[kWaitQueue].length;
}
Expand All @@ -266,10 +273,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
return this[kServiceGenerations];
}

get currentCheckedOutCount(): number {
return this[kCheckedOut];
}

/**
* Get the metrics information for the pool when a wait queue timeout occurs.
*/
Expand Down Expand Up @@ -319,7 +322,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
}, waitQueueTimeoutMS);
}

this[kCheckedOut] = this[kCheckedOut] + 1;
this[kWaitQueue].push(waitQueueMember);
process.nextTick(processWaitQueue, this);
}
Expand All @@ -339,7 +341,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
this[kConnections].unshift(connection);
}

this[kCheckedOut] = this[kCheckedOut] - 1;
this[kCheckedOut]--;
this.emit(ConnectionPool.CONNECTION_CHECKED_IN, new ConnectionCheckedInEvent(this, connection));

if (willDestroy) {
Expand Down Expand Up @@ -527,10 +529,10 @@ function createConnection(pool: ConnectionPool, callback?: Callback<Connection>)
cancellationToken: pool[kCancellationToken]
};

pool[kPermits]--;
pool[kPending]++;
connect(connectOptions, (err, connection) => {
if (err || !connection) {
pool[kPermits]++;
pool[kPending]--;
pool[kLogger].debug(`connection attempt failed with error [${JSON.stringify(err)}]`);
if (typeof callback === 'function') {
callback(err);
Expand All @@ -541,6 +543,7 @@ function createConnection(pool: ConnectionPool, callback?: Callback<Connection>)

// The pool might have closed since we started trying to create a connection
if (pool.closed) {
pool[kPending]--;
connection.destroy({ force: true });
return;
}
Expand Down Expand Up @@ -572,24 +575,22 @@ function createConnection(pool: ConnectionPool, callback?: Callback<Connection>)
connection.markAvailable();
pool.emit(ConnectionPool.CONNECTION_READY, new ConnectionReadyEvent(pool, connection));

// if a callback has been provided, check out the connection immediately
// if a callback has been provided, hand off the connection immediately
if (typeof callback === 'function') {
callback(undefined, connection);
return;
}

// otherwise add it to the pool for later acquisition, and try to process the wait queue
pool[kConnections].push(connection);
pool[kPending]--;
process.nextTick(processWaitQueue, pool);
});
}

function destroyConnection(pool: ConnectionPool, connection: Connection, reason: string) {
pool.emit(ConnectionPool.CONNECTION_CLOSED, new ConnectionClosedEvent(pool, connection, reason));

// allow more connections to be created
pool[kPermits]++;

// destroy the connection
process.nextTick(() => connection.destroy());
}
Expand Down Expand Up @@ -624,6 +625,7 @@ function processWaitQueue(pool: ConnectionPool) {
const isStale = connectionIsStale(pool, connection);
const isIdle = connectionIsIdle(pool, connection);
if (!isStale && !isIdle && !connection.closed) {
pool[kCheckedOut]++;
pool.emit(
ConnectionPool.CONNECTION_CHECKED_OUT,
new ConnectionCheckedOutEvent(pool, connection)
Expand All @@ -647,6 +649,7 @@ function processWaitQueue(pool: ConnectionPool) {
if (!waitQueueMember || waitQueueMember[kCancelled]) {
if (!err && connection) {
pool[kConnections].push(connection);
pool[kPending]--;
}

pool[kProcessingWaitQueue] = false;
Expand All @@ -659,6 +662,8 @@ function processWaitQueue(pool: ConnectionPool) {
new ConnectionCheckOutFailedEvent(pool, err)
);
} else if (connection) {
pool[kCheckedOut]++;
pool[kPending]--;
pool.emit(
ConnectionPool.CONNECTION_CHECKED_OUT,
new ConnectionCheckedOutEvent(pool, connection)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
{
"version": 1,
"style": "unit",
"description": "must correctly update pool stats when checking in a connection",
"poolOptions": {
"minPoolSize": 3
},
"operations": [
{
"name": "waitForEvent",
"event": "ConnectionCreated",
"count": 3
},
{
"name": "waitForEvent",
"event": "ConnectionReady",
"count": 3
},
{
"name": "checkOut",
"label": "conn"
},
{
"name": "checkIn",
"connection": "conn"
}
],
"events": [
{
"type": "ConnectionPoolCreated",
"address": 42,
"options": 42,
"totalConnectionCount": 0,
"availableConnectionCount": 0,
"pendingConnectionCount": 0,
"currentCheckedOutCount": 0
},
{
"type": "ConnectionCreated",
"connectionId": 42,
"address": 42
},
{
"type": "ConnectionCreated",
"connectionId": 42,
"address": 42
},
{
"type": "ConnectionCreated",
"connectionId": 42,
"address": 42
},
{
"type": "ConnectionCheckedOut",
"connectionId": 42,
"address": 42
},
{
"type": "ConnectionCheckedIn",
"connectionId": 42,
"address": 42,
"currentCheckedOutCount": 0,
"availableConnectionCount": 3,
"pendingConnectionCount": 0,
"totalConnectionCount": 3
}
],
"ignore": [
"ConnectionReady",
"ConnectionClosed",
"ConnectionCheckOutStarted"
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
{
"version": 1,
"style": "unit",
"description": "must correctly update pool stats when checking out a connection",
"poolOptions": {
"minPoolSize": 3
},
"operations": [
{
"name": "waitForEvent",
"event": "ConnectionCreated",
"count": 3
},
{
"name": "waitForEvent",
"event": "ConnectionReady",
"count": 3
},
{
"name": "checkOut"
}
],
"events": [
{
"type": "ConnectionPoolCreated",
"address": 42,
"options": 42,
"currentCheckedOutCount": 0,
"availableConnectionCount": 0,
"pendingConnectionCount": 0,
"totalConnectionCount": 0
},
{
"type": "ConnectionCreated",
"connectionId": 42,
"address": 42
},
{
"type": "ConnectionCreated",
"connectionId": 42,
"address": 42
},
{
"type": "ConnectionCreated",
"connectionId": 42,
"address": 42
},
{
"type": "ConnectionCheckedOut",
"connectionId": 42,
"address": 42,
"currentCheckedOutCount": 1,
"availableConnectionCount": 2,
"pendingConnectionCount": 0,
"totalConnectionCount": 3
}
],
"ignore": [
"ConnectionReady",
"ConnectionClosed",
"ConnectionCheckOutStarted"
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
{
"version": 1,
"style": "unit",
"description": "must correctly update pool stats when populating the pool up to minPoolSize",
"poolOptions": {
"minPoolSize": 3
},
"operations": [
{
"name": "waitForEvent",
"event": "ConnectionCreated",
"count": 3
},
{
"name": "waitForEvent",
"event": "ConnectionReady",
"count": 3
}
],
"events": [
{
"type": "ConnectionPoolCreated",
"address": 42,
"options": 42,
"currentCheckedOutCount": 0,
"availableConnectionCount": 0,
"pendingConnectionCount": 0,
"totalConnectionCount": 0
},
{
"type": "ConnectionCreated",
"connectionId": 42,
"address": 42,
"currentCheckedOutCount": 0,
"availableConnectionCount": 0,
"pendingConnectionCount": 3,
"totalConnectionCount": 3
},
{
"type": "ConnectionCreated",
"connectionId": 42,
"address": 42,
"availableConnectionCount": 1,
"pendingConnectionCount": 2,
"totalConnectionCount": 3
},
{
"type": "ConnectionCreated",
"connectionId": 42,
"address": 42,
"availableConnectionCount": 2,
"pendingConnectionCount": 1,
"totalConnectionCount": 3
}
],
"ignore": [
"ConnectionReady",
"ConnectionClosed"
]
}

0 comments on commit e1e4377

Please sign in to comment.