Skip to content

Commit

Permalink
feat(NODE-2993): implement maxConnecting (#3255)
Browse files Browse the repository at this point in the history
  • Loading branch information
dariakp committed May 23, 2022
1 parent b2798d9 commit c9d3816
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 74 deletions.
117 changes: 62 additions & 55 deletions src/cmap/connection_pool.ts
Expand Up @@ -107,7 +107,7 @@ export type ConnectionPoolEvents = {
*/
export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
closed: boolean;
options: Readonly<ConnectionPoolOptions>;
options: Readonly<ConnectionPoolOptions & { maxConnecting: number }>;
/** @internal */
[kLogger]: Logger;
/** @internal */
Expand Down Expand Up @@ -199,6 +199,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
connectionType: Connection,
maxPoolSize: options.maxPoolSize ?? 100,
minPoolSize: options.minPoolSize ?? 0,
maxConnecting: 2,
maxIdleTimeMS: options.maxIdleTimeMS ?? 0,
waitQueueTimeoutMS: options.waitQueueTimeoutMS ?? 0,
autoEncrypter: options.autoEncrypter,
Expand Down Expand Up @@ -494,16 +495,29 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
}

function ensureMinPoolSize(pool: ConnectionPool) {
if (pool.closed || pool.options.minPoolSize === 0) {
const minPoolSize = pool.options.minPoolSize;
if (pool.closed || minPoolSize === 0) {
return;
}

const minPoolSize = pool.options.minPoolSize;
for (let i = pool.totalConnectionCount; i < minPoolSize; ++i) {
createConnection(pool);
if (
pool.totalConnectionCount < minPoolSize &&
pool.pendingConnectionCount < pool.options.maxConnecting
) {
// NOTE: ensureMinPoolSize should not try to get all the pending
// connection permits because that potentially delays the availability of
// the connection to a checkout request
createConnection(pool, (err, connection) => {
pool[kPending]--;
if (!err && connection) {
pool[kConnections].push(connection);
process.nextTick(processWaitQueue, pool);
}
pool[kMinPoolSizeTimer] = setTimeout(() => ensureMinPoolSize(pool), 10);
});
} else {
pool[kMinPoolSizeTimer] = setTimeout(() => ensureMinPoolSize(pool), 100);
}

pool[kMinPoolSizeTimer] = setTimeout(() => ensureMinPoolSize(pool), 10);
}

function connectionIsStale(pool: ConnectionPool, connection: Connection) {
Expand All @@ -521,7 +535,7 @@ function connectionIsIdle(pool: ConnectionPool, connection: Connection) {
return !!(pool.options.maxIdleTimeMS && connection.idleTime > pool.options.maxIdleTimeMS);
}

function createConnection(pool: ConnectionPool, callback?: Callback<Connection>) {
function createConnection(pool: ConnectionPool, callback: Callback<Connection>) {
const connectOptions: ConnectionOptions = {
...pool.options,
id: pool[kConnectionCounter].next().value,
Expand All @@ -530,14 +544,16 @@ function createConnection(pool: ConnectionPool, callback?: Callback<Connection>)
};

pool[kPending]++;
// This is our version of a "virtual" no-I/O connection as the spec requires
pool.emit(
ConnectionPool.CONNECTION_CREATED,
new ConnectionCreatedEvent(pool, { id: connectOptions.id })
);

connect(connectOptions, (err, connection) => {
if (err || !connection) {
pool[kPending]--;
pool[kLogger].debug(`connection attempt failed with error [${JSON.stringify(err)}]`);
if (typeof callback === 'function') {
callback(err);
}

callback(err);
return;
}

Expand All @@ -553,8 +569,6 @@ function createConnection(pool: ConnectionPool, callback?: Callback<Connection>)
connection.on(event, (e: any) => pool.emit(event, e));
}

pool.emit(ConnectionPool.CONNECTION_CREATED, new ConnectionCreatedEvent(pool, connection));

if (pool.loadBalanced) {
connection.on(Connection.PINNED, pinType => pool[kMetrics].markPinned(pinType));
connection.on(Connection.UNPINNED, pinType => pool[kMetrics].markUnpinned(pinType));
Expand All @@ -575,16 +589,8 @@ function createConnection(pool: ConnectionPool, callback?: Callback<Connection>)
connection.markAvailable();
pool.emit(ConnectionPool.CONNECTION_READY, new ConnectionReadyEvent(pool, connection));

// if a callback has been provided, hand off the connection immediately
if (typeof callback === 'function') {
callback(undefined, connection);
return;
}

// otherwise add it to the pool for later acquisition, and try to process the wait queue
pool[kConnections].push(connection);
pool[kPending]--;
process.nextTick(processWaitQueue, pool);
callback(undefined, connection);
return;
});
}

Expand Down Expand Up @@ -642,44 +648,45 @@ function processWaitQueue(pool: ConnectionPool) {
}
}

const maxPoolSize = pool.options.maxPoolSize;
if (pool.waitQueueSize && (maxPoolSize <= 0 || pool.totalConnectionCount < maxPoolSize)) {
const { maxPoolSize, maxConnecting } = pool.options;
while (
pool.waitQueueSize > 0 &&
pool.pendingConnectionCount < maxConnecting &&
(maxPoolSize === 0 || pool.totalConnectionCount < maxPoolSize)
) {
const waitQueueMember = pool[kWaitQueue].shift();
if (!waitQueueMember || waitQueueMember[kCancelled]) {
continue;
}
createConnection(pool, (err, connection) => {
const waitQueueMember = pool[kWaitQueue].shift();
if (!waitQueueMember || waitQueueMember[kCancelled]) {
pool[kPending]--;
if (waitQueueMember[kCancelled]) {
if (!err && connection) {
pool[kConnections].push(connection);
pool[kPending]--;
}
} else {
if (err) {
pool.emit(
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
new ConnectionCheckOutFailedEvent(pool, err)
);
} else if (connection) {
pool[kCheckedOut]++;
pool.emit(
ConnectionPool.CONNECTION_CHECKED_OUT,
new ConnectionCheckedOutEvent(pool, connection)
);
}

pool[kProcessingWaitQueue] = false;
return;
}

if (err) {
pool.emit(
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
new ConnectionCheckOutFailedEvent(pool, err)
);
} else if (connection) {
pool[kCheckedOut]++;
pool[kPending]--;
pool.emit(
ConnectionPool.CONNECTION_CHECKED_OUT,
new ConnectionCheckedOutEvent(pool, connection)
);
}

if (waitQueueMember.timer) {
clearTimeout(waitQueueMember.timer);
if (waitQueueMember.timer) {
clearTimeout(waitQueueMember.timer);
}
waitQueueMember.callback(err, connection);
}
waitQueueMember.callback(err, connection);
pool[kProcessingWaitQueue] = false;
process.nextTick(() => processWaitQueue(pool));
process.nextTick(processWaitQueue, pool);
});
} else {
pool[kProcessingWaitQueue] = false;
}
pool[kProcessingWaitQueue] = false;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/cmap/connection_pool_events.ts
Expand Up @@ -59,7 +59,7 @@ export class ConnectionCreatedEvent extends ConnectionPoolMonitoringEvent {
connectionId: number | '<monitor>';

/** @internal */
constructor(pool: ConnectionPool, connection: Connection) {
constructor(pool: ConnectionPool, connection: { id: number | '<monitor>' }) {
super(pool);
this.connectionId = connection.id;
}
Expand Down
Expand Up @@ -33,16 +33,16 @@
"address": 42,
"currentCheckedOutCount": 0,
"availableConnectionCount": 0,
"pendingConnectionCount": 3,
"totalConnectionCount": 3
"pendingConnectionCount": 1,
"totalConnectionCount": 1
},
{
"type": "ConnectionCreated",
"connectionId": 42,
"address": 42,
"availableConnectionCount": 1,
"pendingConnectionCount": 2,
"totalConnectionCount": 3
"pendingConnectionCount": 1,
"totalConnectionCount": 2
},
{
"type": "ConnectionCreated",
Expand Down
@@ -0,0 +1,77 @@
{
"version": 1,
"style": "unit",
"description": "must replace removed connections up to minPoolSize",
"poolOptions": {
"minPoolSize": 2
},
"operations": [
{
"name": "waitForEvent",
"event": "ConnectionReady",
"count": 2
},
{
"name": "wait",
"ms": 1000
},
{
"name": "checkOut",
"label": "conn"
},
{
"name": "clear"
},
{
"name": "checkIn",
"connection": "conn"
},
{
"name": "waitForEvent",
"event": "ConnectionReady",
"count": 3
}
],
"events": [
{
"type": "ConnectionReady",
"address": 42
},
{
"type": "ConnectionReady",
"address": 42
},
{
"type": "ConnectionCheckedOut",
"address": 42
},
{
"type": "ConnectionPoolCleared",
"address": 42
},
{
"type": "ConnectionCheckedIn",
"address": 42
},
{
"type": "ConnectionClosed",
"reason": "stale",
"address": 42,
"availableConnectionCount": 1,
"pendingConnectionCount": 0,
"totalConnectionCount": 1
},
{
"type": "ConnectionReady",
"address": 42,
"availableConnectionCount": 1,
"pendingConnectionCount": 1,
"totalConnectionCount": 2
}
],
"ignore": [
"ConnectionPoolCreated",
"ConnectionCreated",
"ConnectionCheckOutStarted"
]
}
Expand Up @@ -15,9 +15,14 @@ const LB_SKIP_TESTS: SkipDescription[] = [
describe('Connection Monitoring and Pooling Spec Tests (Integration)', function () {
const tests: CmapTest[] = loadSpecTests('connection-monitoring-and-pooling');

runCmapTestSuite(
// TODO(NODE-2993): unskip integration tests for maxConnecting
tests.filter(({ style }) => style === 'unit'),
{ testsToSkip: LB_SKIP_TESTS }
);
runCmapTestSuite(tests, {
testsToSkip: LB_SKIP_TESTS.concat([
{
description: 'waiting on maxConnecting is limited by WaitQueueTimeoutMS',
skipIfCondition: 'always',
skipReason:
'not applicable: waitQueueTimeoutMS limits connection establishment time in our driver'
}
])
});
});
Expand Up @@ -6,5 +6,14 @@ describe('Connection Monitoring and Pooling (Node Driver)', function () {
'../integration/connection-monitoring-and-pooling/cmap-node-specs'
);

runCmapTestSuite(tests, { injectPoolStats: true });
runCmapTestSuite(tests, {
injectPoolStats: true,
testsToSkip: [
{
description: 'must replace removed connections up to minPoolSize',
skipIfCondition: 'loadBalanced',
skipReason: 'cannot run against load balancer due to reliance on pool.clear() command'
}
]
});
});
24 changes: 16 additions & 8 deletions test/tools/cmap_spec_runner.ts
Expand Up @@ -116,12 +116,14 @@ class Thread {
await sleep();
}

queue(op: CmapOperation) {
queue(op: CmapOperation, thread?: Thread) {
if (this.#killed || this.#error) {
return;
}

this.#promise = this.#promise.then(() => this._runOperation(op)).catch(e => (this.#error = e));
const functionToQueue = () => (!thread ? this._runOperation(op) : thread.queue(op));

this.#promise = this.#promise.then(functionToQueue).catch(e => (this.#error = e));
}

async finish() {
Expand Down Expand Up @@ -352,9 +354,12 @@ async function runCmapTest(test: CmapTest, threadContext: ThreadContext) {
const op = operations[idx];

const threadKey = op.name === 'checkOut' ? op.thread || MAIN_THREAD_KEY : MAIN_THREAD_KEY;
const thread = threadContext.getThread(threadKey);

thread.queue(op);
if (threadKey === MAIN_THREAD_KEY) {
mainThread.queue(op);
} else {
const thread = threadContext.getThread(threadKey);
mainThread.queue(op, thread);
}
}

await mainThread.finish().catch(e => {
Expand Down Expand Up @@ -387,6 +392,7 @@ async function runCmapTest(test: CmapTest, threadContext: ThreadContext) {
);

expect(actualEvents).to.have.lengthOf(expectedEvents.length);

for (const expected of expectedEvents) {
const actual = actualEvents.shift();
const { type: eventType, ...eventPropsToCheck } = expected;
Expand All @@ -397,7 +403,7 @@ async function runCmapTest(test: CmapTest, threadContext: ThreadContext) {

export type SkipDescription = {
description: string;
skipIfCondition: 'loadBalanced';
skipIfCondition: 'loadBalanced' | 'always';
skipReason: string;
};

Expand All @@ -416,10 +422,12 @@ export function runCmapTestSuite(
({ description }) => description === test.description
);
if (skipDescription) {
const alwaysSkip = skipDescription.skipIfCondition === 'always';
const matchesLoadBalanceSkip =
skipDescription.skipIfCondition === 'loadBalanced' && this.configuration.isLoadBalanced;
if (matchesLoadBalanceSkip) {
(this.currentTest as Mocha.Runnable).skipReason = skipDescription.skipReason;

if (alwaysSkip || matchesLoadBalanceSkip) {
this.currentTest.skipReason = skipDescription.skipReason;
this.skip();
}
}
Expand Down

0 comments on commit c9d3816

Please sign in to comment.