Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-2993): implement maxConnecting #3255

Merged
merged 14 commits into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
117 changes: 62 additions & 55 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ export type ConnectionPoolEvents = {
*/
export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
closed: boolean;
options: Readonly<ConnectionPoolOptions>;
options: Readonly<ConnectionPoolOptions & { maxConnecting: number }>;
dariakp marked this conversation as resolved.
Show resolved Hide resolved
/** @internal */
[kLogger]: Logger;
/** @internal */
Expand Down Expand Up @@ -199,6 +199,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
connectionType: Connection,
maxPoolSize: options.maxPoolSize ?? 100,
minPoolSize: options.minPoolSize ?? 0,
maxConnecting: 2,
maxIdleTimeMS: options.maxIdleTimeMS ?? 0,
waitQueueTimeoutMS: options.waitQueueTimeoutMS ?? 0,
autoEncrypter: options.autoEncrypter,
Expand Down Expand Up @@ -494,16 +495,29 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
}

function ensureMinPoolSize(pool: ConnectionPool) {
if (pool.closed || pool.options.minPoolSize === 0) {
const minPoolSize = pool.options.minPoolSize;
if (pool.closed || minPoolSize === 0) {
return;
}

const minPoolSize = pool.options.minPoolSize;
for (let i = pool.totalConnectionCount; i < minPoolSize; ++i) {
createConnection(pool);
if (
pool.totalConnectionCount < minPoolSize &&
pool.pendingConnectionCount < pool.options.maxConnecting
) {
// NOTE: ensureMinPoolSize should not try to get all the pending
// connection permits because that potentially delays the availability of
// the connection to a checkout request
createConnection(pool, (err, connection) => {
pool[kPending]--;
if (!err && connection) {
pool[kConnections].push(connection);
process.nextTick(processWaitQueue, pool);
}
pool[kMinPoolSizeTimer] = setTimeout(() => ensureMinPoolSize(pool), 10);
});
} else {
pool[kMinPoolSizeTimer] = setTimeout(() => ensureMinPoolSize(pool), 100);
}

pool[kMinPoolSizeTimer] = setTimeout(() => ensureMinPoolSize(pool), 10);
}

function connectionIsStale(pool: ConnectionPool, connection: Connection) {
Expand All @@ -521,7 +535,7 @@ function connectionIsIdle(pool: ConnectionPool, connection: Connection) {
return !!(pool.options.maxIdleTimeMS && connection.idleTime > pool.options.maxIdleTimeMS);
}

function createConnection(pool: ConnectionPool, callback?: Callback<Connection>) {
function createConnection(pool: ConnectionPool, callback: Callback<Connection>) {
const connectOptions: ConnectionOptions = {
...pool.options,
id: pool[kConnectionCounter].next().value,
Expand All @@ -530,14 +544,16 @@ function createConnection(pool: ConnectionPool, callback?: Callback<Connection>)
};

pool[kPending]++;
// This is our version of a "virtual" no-I/O connection as the spec requires
dariakp marked this conversation as resolved.
Show resolved Hide resolved
pool.emit(
ConnectionPool.CONNECTION_CREATED,
new ConnectionCreatedEvent(pool, { id: connectOptions.id })
);

connect(connectOptions, (err, connection) => {
if (err || !connection) {
pool[kPending]--;
pool[kLogger].debug(`connection attempt failed with error [${JSON.stringify(err)}]`);
if (typeof callback === 'function') {
callback(err);
}

callback(err);
return;
}

Expand All @@ -553,8 +569,6 @@ function createConnection(pool: ConnectionPool, callback?: Callback<Connection>)
connection.on(event, (e: any) => pool.emit(event, e));
}

pool.emit(ConnectionPool.CONNECTION_CREATED, new ConnectionCreatedEvent(pool, connection));

if (pool.loadBalanced) {
connection.on(Connection.PINNED, pinType => pool[kMetrics].markPinned(pinType));
connection.on(Connection.UNPINNED, pinType => pool[kMetrics].markUnpinned(pinType));
Expand All @@ -575,16 +589,8 @@ function createConnection(pool: ConnectionPool, callback?: Callback<Connection>)
connection.markAvailable();
pool.emit(ConnectionPool.CONNECTION_READY, new ConnectionReadyEvent(pool, connection));

// if a callback has been provided, hand off the connection immediately
if (typeof callback === 'function') {
callback(undefined, connection);
return;
}

// otherwise add it to the pool for later acquisition, and try to process the wait queue
pool[kConnections].push(connection);
pool[kPending]--;
process.nextTick(processWaitQueue, pool);
callback(undefined, connection);
return;
});
}

Expand Down Expand Up @@ -642,44 +648,45 @@ function processWaitQueue(pool: ConnectionPool) {
}
}

const maxPoolSize = pool.options.maxPoolSize;
if (pool.waitQueueSize && (maxPoolSize <= 0 || pool.totalConnectionCount < maxPoolSize)) {
const { maxPoolSize, maxConnecting } = pool.options;
while (
pool.waitQueueSize > 0 &&
pool.pendingConnectionCount < maxConnecting &&
(maxPoolSize === 0 || pool.totalConnectionCount < maxPoolSize)
) {
const waitQueueMember = pool[kWaitQueue].shift();
if (!waitQueueMember || waitQueueMember[kCancelled]) {
continue;
}
createConnection(pool, (err, connection) => {
const waitQueueMember = pool[kWaitQueue].shift();
if (!waitQueueMember || waitQueueMember[kCancelled]) {
pool[kPending]--;
if (waitQueueMember[kCancelled]) {
if (!err && connection) {
pool[kConnections].push(connection);
pool[kPending]--;
}
} else {
if (err) {
pool.emit(
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
new ConnectionCheckOutFailedEvent(pool, err)
);
} else if (connection) {
pool[kCheckedOut]++;
pool.emit(
ConnectionPool.CONNECTION_CHECKED_OUT,
new ConnectionCheckedOutEvent(pool, connection)
);
}

pool[kProcessingWaitQueue] = false;
return;
}

if (err) {
pool.emit(
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
new ConnectionCheckOutFailedEvent(pool, err)
);
} else if (connection) {
pool[kCheckedOut]++;
pool[kPending]--;
pool.emit(
ConnectionPool.CONNECTION_CHECKED_OUT,
new ConnectionCheckedOutEvent(pool, connection)
);
}

if (waitQueueMember.timer) {
clearTimeout(waitQueueMember.timer);
if (waitQueueMember.timer) {
clearTimeout(waitQueueMember.timer);
}
waitQueueMember.callback(err, connection);
}
waitQueueMember.callback(err, connection);
pool[kProcessingWaitQueue] = false;
process.nextTick(() => processWaitQueue(pool));
process.nextTick(processWaitQueue, pool);
});
} else {
pool[kProcessingWaitQueue] = false;
}
pool[kProcessingWaitQueue] = false;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/cmap/connection_pool_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export class ConnectionCreatedEvent extends ConnectionPoolMonitoringEvent {
connectionId: number | '<monitor>';

/** @internal */
constructor(pool: ConnectionPool, connection: Connection) {
constructor(pool: ConnectionPool, connection: { id: number | '<monitor>' }) {
super(pool);
this.connectionId = connection.id;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,16 @@
"address": 42,
"currentCheckedOutCount": 0,
"availableConnectionCount": 0,
"pendingConnectionCount": 3,
"totalConnectionCount": 3
"pendingConnectionCount": 1,
"totalConnectionCount": 1
},
{
"type": "ConnectionCreated",
"connectionId": 42,
"address": 42,
"availableConnectionCount": 1,
"pendingConnectionCount": 2,
"totalConnectionCount": 3
"pendingConnectionCount": 1,
"totalConnectionCount": 2
},
{
"type": "ConnectionCreated",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
{
"version": 1,
"style": "unit",
"description": "must replace removed connections up to minPoolSize",
"poolOptions": {
"minPoolSize": 2
},
"operations": [
{
"name": "waitForEvent",
"event": "ConnectionReady",
"count": 2
},
{
"name": "wait",
"ms": 1000
},
{
"name": "checkOut",
"label": "conn"
},
{
"name": "clear"
},
{
"name": "checkIn",
"connection": "conn"
},
{
"name": "waitForEvent",
"event": "ConnectionReady",
"count": 3
}
],
"events": [
{
"type": "ConnectionReady",
"address": 42
},
{
"type": "ConnectionReady",
"address": 42
},
{
"type": "ConnectionCheckedOut",
"address": 42
},
{
"type": "ConnectionPoolCleared",
"address": 42
},
{
"type": "ConnectionCheckedIn",
"address": 42
},
{
"type": "ConnectionClosed",
"reason": "stale",
"address": 42,
"availableConnectionCount": 1,
"pendingConnectionCount": 0,
"totalConnectionCount": 1
},
{
"type": "ConnectionReady",
"address": 42,
"availableConnectionCount": 1,
"pendingConnectionCount": 1,
"totalConnectionCount": 2
}
],
"ignore": [
"ConnectionPoolCreated",
"ConnectionCreated",
"ConnectionCheckOutStarted"
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@ const LB_SKIP_TESTS: SkipDescription[] = [
describe('Connection Monitoring and Pooling Spec Tests (Integration)', function () {
const tests: CmapTest[] = loadSpecTests('connection-monitoring-and-pooling');

runCmapTestSuite(
// TODO(NODE-2993): unskip integration tests for maxConnecting
tests.filter(({ style }) => style === 'unit'),
{ testsToSkip: LB_SKIP_TESTS }
);
runCmapTestSuite(tests, {
testsToSkip: LB_SKIP_TESTS.concat([
{
description: 'waiting on maxConnecting is limited by WaitQueueTimeoutMS',
skipIfCondition: 'always',
skipReason:
'not applicable: waitQueueTimeoutMS limits connection establishment time in our driver'
}
])
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,14 @@ describe('Connection Monitoring and Pooling (Node Driver)', function () {
'../integration/connection-monitoring-and-pooling/cmap-node-specs'
);

runCmapTestSuite(tests, { injectPoolStats: true });
runCmapTestSuite(tests, {
injectPoolStats: true,
testsToSkip: [
{
description: 'must replace removed connections up to minPoolSize',
skipIfCondition: 'loadBalanced',
skipReason: 'cannot run against load balancer due to reliance on pool.clear() command'
}
]
});
});
24 changes: 16 additions & 8 deletions test/tools/cmap_spec_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,14 @@ class Thread {
await sleep();
}

queue(op: CmapOperation) {
queue(op: CmapOperation, thread?: Thread) {
if (this.#killed || this.#error) {
return;
}

this.#promise = this.#promise.then(() => this._runOperation(op)).catch(e => (this.#error = e));
const functionToQueue = () => (!thread ? this._runOperation(op) : thread.queue(op));

this.#promise = this.#promise.then(functionToQueue).catch(e => (this.#error = e));
}

async finish() {
Expand Down Expand Up @@ -352,9 +354,12 @@ async function runCmapTest(test: CmapTest, threadContext: ThreadContext) {
const op = operations[idx];

const threadKey = op.name === 'checkOut' ? op.thread || MAIN_THREAD_KEY : MAIN_THREAD_KEY;
const thread = threadContext.getThread(threadKey);

thread.queue(op);
if (threadKey === MAIN_THREAD_KEY) {
mainThread.queue(op);
} else {
const thread = threadContext.getThread(threadKey);
mainThread.queue(op, thread);
}
}

await mainThread.finish().catch(e => {
Expand Down Expand Up @@ -387,6 +392,7 @@ async function runCmapTest(test: CmapTest, threadContext: ThreadContext) {
);

expect(actualEvents).to.have.lengthOf(expectedEvents.length);

for (const expected of expectedEvents) {
const actual = actualEvents.shift();
const { type: eventType, ...eventPropsToCheck } = expected;
Expand All @@ -397,7 +403,7 @@ async function runCmapTest(test: CmapTest, threadContext: ThreadContext) {

export type SkipDescription = {
description: string;
skipIfCondition: 'loadBalanced';
skipIfCondition: 'loadBalanced' | 'always';
skipReason: string;
};

Expand All @@ -416,10 +422,12 @@ export function runCmapTestSuite(
({ description }) => description === test.description
);
if (skipDescription) {
const alwaysSkip = skipDescription.skipIfCondition === 'always';
const matchesLoadBalanceSkip =
skipDescription.skipIfCondition === 'loadBalanced' && this.configuration.isLoadBalanced;
if (matchesLoadBalanceSkip) {
(this.currentTest as Mocha.Runnable).skipReason = skipDescription.skipReason;

if (alwaysSkip || matchesLoadBalanceSkip) {
this.currentTest.skipReason = skipDescription.skipReason;
this.skip();
}
}
Expand Down