diff --git a/.evergreen/config.yml b/.evergreen/config.yml index ac1e404bac3..672c41a1758 100644 --- a/.evergreen/config.yml +++ b/.evergreen/config.yml @@ -134,6 +134,44 @@ functions: MONGODB_API_VERSION="${MONGODB_API_VERSION}" \ NODE_VERSION=${NODE_VERSION} SKIP_DEPS=${SKIP_DEPS|1} NO_EXIT=${NO_EXIT|1} \ bash ${PROJECT_DIRECTORY}/.evergreen/run-tests.sh + start-load-balancer: + - command: shell.exec + params: + script: | + DRIVERS_TOOLS=${DRIVERS_TOOLS} MONGODB_URI=${MONGODB_URI} \ + bash ${DRIVERS_TOOLS}/.evergreen/run-load-balancer.sh start + - command: expansions.update + params: + file: lb-expansion.yml + stop-load-balancer: + - command: shell.exec + params: + script: | + DRIVERS_TOOLS=${DRIVERS_TOOLS} MONGODB_URI=${MONGODB_URI} \ + bash ${DRIVERS_TOOLS}/.evergreen/run-load-balancer.sh stop + run-lb-tests: + - command: shell.exec + type: test + params: + working_dir: src + timeout_secs: 60 + script: | + ${PREPARE_SHELL} + + MONGODB_URI="${MONGODB_URI}" \ + AUTH=${AUTH} \ + SSL=${SSL} \ + UNIFIED=${UNIFIED} \ + MONGODB_API_VERSION="${MONGODB_API_VERSION}" \ + NODE_VERSION=${NODE_VERSION} \ + SINGLE_MONGOS_LB_URI="${SINGLE_MONGOS_LB_URI}" \ + MULTI_MONGOS_LB_URI="${MULTI_MONGOS_LB_URI}" \ + TOPOLOGY="${TOPOLOGY}" \ + SKIP_DEPS=${SKIP_DEPS|1} \ + NO_EXIT=${NO_EXIT|1} \ + TEST_NPM_SCRIPT="check:load-balancer" \ + FAKE_MONGODB_SERVICE_ID="true" \ + bash ${PROJECT_DIRECTORY}/.evergreen/run-tests.sh run checks: - command: shell.exec type: test @@ -937,6 +975,20 @@ tasks: - func: install dependencies - func: bootstrap mongohoused - func: run data lake tests + - name: test-load-balancer + tags: + - latest + - sharded_cluster + - load_balancer + commands: + - func: install dependencies + - func: bootstrap mongo-orchestration + vars: + VERSION: latest + TOPOLOGY: sharded_cluster + - func: start-load-balancer + - func: run-lb-tests + - func: stop-load-balancer - name: test-auth-kerberos tags: - auth @@ -1760,6 +1812,7 @@ buildvariants: - test-latest-server-v1-api - test-atlas-connectivity - test-atlas-data-lake + - test-load-balancer - test-auth-kerberos - test-auth-ldap - test-ocsp-valid-cert-server-staples diff --git a/.evergreen/config.yml.in b/.evergreen/config.yml.in index e9fa0553ef2..dbb05f1cc3b 100644 --- a/.evergreen/config.yml.in +++ b/.evergreen/config.yml.in @@ -155,6 +155,47 @@ functions: NODE_VERSION=${NODE_VERSION} SKIP_DEPS=${SKIP_DEPS|1} NO_EXIT=${NO_EXIT|1} \ bash ${PROJECT_DIRECTORY}/.evergreen/run-tests.sh + "start-load-balancer": + - command: shell.exec + params: + script: | + DRIVERS_TOOLS=${DRIVERS_TOOLS} MONGODB_URI=${MONGODB_URI} \ + bash ${DRIVERS_TOOLS}/.evergreen/run-load-balancer.sh start + - command: expansions.update + params: + file: lb-expansion.yml + + "stop-load-balancer": + - command: shell.exec + params: + script: | + DRIVERS_TOOLS=${DRIVERS_TOOLS} MONGODB_URI=${MONGODB_URI} \ + bash ${DRIVERS_TOOLS}/.evergreen/run-load-balancer.sh stop + + "run-lb-tests": + - command: shell.exec + type: test + params: + working_dir: src + timeout_secs: 60 + script: | + ${PREPARE_SHELL} + + MONGODB_URI="${MONGODB_URI}" \ + AUTH=${AUTH} \ + SSL=${SSL} \ + UNIFIED=${UNIFIED} \ + MONGODB_API_VERSION="${MONGODB_API_VERSION}" \ + NODE_VERSION=${NODE_VERSION} \ + SINGLE_MONGOS_LB_URI="${SINGLE_MONGOS_LB_URI}" \ + MULTI_MONGOS_LB_URI="${MULTI_MONGOS_LB_URI}" \ + TOPOLOGY="${TOPOLOGY}" \ + SKIP_DEPS=${SKIP_DEPS|1} \ + NO_EXIT=${NO_EXIT|1} \ + TEST_NPM_SCRIPT="check:load-balancer" \ + FAKE_MONGODB_SERVICE_ID="true" \ + bash ${PROJECT_DIRECTORY}/.evergreen/run-tests.sh + "run checks": - command: shell.exec type: test diff --git a/.evergreen/generate_evergreen_tasks.js b/.evergreen/generate_evergreen_tasks.js index 4a48f671615..28a5db1866b 100644 --- a/.evergreen/generate_evergreen_tasks.js +++ b/.evergreen/generate_evergreen_tasks.js @@ -42,7 +42,8 @@ const OPERATING_SYSTEMS = [ })); // TODO: NODE-3060: enable skipped tests on windows -const WINDOWS_SKIP_TAGS = new Set(['atlas-connect', 'auth']); +const WINDOWS_SKIP_TAGS = new Set(['atlas-connect', 'auth', 'load_balancer']); +const MACOS_SKIP_TAGS = new Set(['load_balancer']); const TASKS = []; const SINGLETON_TASKS = []; @@ -107,6 +108,23 @@ TASKS.push(...[ { func: 'run data lake tests' } ] }, + { + name: 'test-load-balancer', + tags: ['latest', 'sharded_cluster', 'load_balancer'], + commands: [ + { func: 'install dependencies' }, + { + func: 'bootstrap mongo-orchestration', + vars: { + VERSION: 'latest', + TOPOLOGY: 'sharded_cluster' + } + }, + { func: 'start-load-balancer' }, + { func: 'run-lb-tests' }, + { func: 'stop-load-balancer' } + ] + }, { name: 'test-auth-kerberos', tags: ['auth', 'kerberos'], @@ -429,11 +447,12 @@ const getTaskList = (() => { .filter(task => { if (task.name.match(/^aws/)) return false; - // skip unsupported tasks on windows + // skip unsupported tasks on windows or macos if ( - os.match(/^windows/) && - task.tags && - task.tags.filter(tag => WINDOWS_SKIP_TAGS.has(tag)).length + task.tags && ( + (os.match(/^windows/) && task.tags.filter(tag => WINDOWS_SKIP_TAGS.has(tag)).length) || + (os.match(/^macos/) && task.tags.filter(tag => MACOS_SKIP_TAGS.has(tag)).length) + ) ) { return false; } diff --git a/.evergreen/run-tests.sh b/.evergreen/run-tests.sh index 2f2739f7b4c..8a4fef1271a 100755 --- a/.evergreen/run-tests.sh +++ b/.evergreen/run-tests.sh @@ -49,4 +49,4 @@ else . $DRIVERS_TOOLS/.evergreen/csfle/set-temp-creds.sh fi -MONGODB_API_VERSION=${MONGODB_API_VERSION} MONGODB_UNIFIED_TOPOLOGY=${UNIFIED} MONGODB_URI=${MONGODB_URI} npm run ${TEST_NPM_SCRIPT} +SINGLE_MONGOS_LB_URI=${SINGLE_MONGOS_LB_URI} MULTI_MONGOS_LB_URI=${MULTI_MONGOS_LB_URI} MONGODB_API_VERSION=${MONGODB_API_VERSION} MONGODB_UNIFIED_TOPOLOGY=${UNIFIED} MONGODB_URI=${MONGODB_URI} npm run ${TEST_NPM_SCRIPT} diff --git a/package.json b/package.json index 082a77cdd2f..e6aa33a9912 100644 --- a/package.json +++ b/package.json @@ -119,6 +119,7 @@ "check:ts": "tsc -v && tsc --noEmit", "check:atlas": "mocha --config \"test/manual/mocharc.json\" test/manual/atlas_connectivity.test.js", "check:adl": "mocha test/manual/data_lake.test.js", + "check:load-balancer": "mocha test/manual/load-balancer.test.js", "check:ocsp": "mocha --config \"test/manual/mocharc.json\" test/manual/ocsp_support.test.js", "check:kerberos": "mocha --config \"test/manual/mocharc.json\" test/manual/kerberos.test.js", "check:tls": "mocha --config \"test/manual/mocharc.json\" test/manual/tls_support.test.js", diff --git a/src/cmap/command_monitoring_events.ts b/src/cmap/command_monitoring_events.ts index 0140baf6991..41eaf65af5a 100644 --- a/src/cmap/command_monitoring_events.ts +++ b/src/cmap/command_monitoring_events.ts @@ -1,8 +1,7 @@ import { GetMore, KillCursor, Msg, WriteProtocolMessageType } from './commands'; import { calculateDurationInMs, deepCopy } from '../utils'; -import type { ConnectionPool } from './connection_pool'; import type { Connection } from './connection'; -import type { Document } from '../bson'; +import type { Document, ObjectId } from '../bson'; /** * An event indicating the start of a given @@ -17,6 +16,7 @@ export class CommandStartedEvent { command: Document; address: string; connectionId?: string | number; + serviceId?: ObjectId; /** * Create a started event @@ -25,10 +25,10 @@ export class CommandStartedEvent { * @param pool - the pool that originated the command * @param command - the command */ - constructor(pool: Connection | ConnectionPool, command: WriteProtocolMessageType) { + constructor(connection: Connection, command: WriteProtocolMessageType) { const cmd = extractCommand(command); const commandName = extractCommandName(cmd); - const { address, connectionId } = extractConnectionDetails(pool); + const { address, connectionId, serviceId } = extractConnectionDetails(connection); // TODO: remove in major revision, this is not spec behavior if (SENSITIVE_COMMANDS.has(commandName)) { @@ -38,11 +38,17 @@ export class CommandStartedEvent { this.address = address; this.connectionId = connectionId; + this.serviceId = serviceId; this.requestId = command.requestId; this.databaseName = databaseName(command); this.commandName = commandName; this.command = maybeRedact(commandName, cmd, cmd); } + + /* @internal */ + get hasServiceId(): boolean { + return !!this.serviceId; + } } /** @@ -57,6 +63,7 @@ export class CommandSucceededEvent { duration: number; commandName: string; reply: unknown; + serviceId?: ObjectId; /** * Create a succeeded event @@ -68,22 +75,28 @@ export class CommandSucceededEvent { * @param started - a high resolution tuple timestamp of when the command was first sent, to calculate duration */ constructor( - pool: Connection | ConnectionPool, + connection: Connection, command: WriteProtocolMessageType, reply: Document | undefined, started: number ) { const cmd = extractCommand(command); const commandName = extractCommandName(cmd); - const { address, connectionId } = extractConnectionDetails(pool); + const { address, connectionId, serviceId } = extractConnectionDetails(connection); this.address = address; this.connectionId = connectionId; + this.serviceId = serviceId; this.requestId = command.requestId; this.commandName = commandName; this.duration = calculateDurationInMs(started); this.reply = maybeRedact(commandName, cmd, extractReply(command, reply)); } + + /* @internal */ + get hasServiceId(): boolean { + return !!this.serviceId; + } } /** @@ -98,6 +111,8 @@ export class CommandFailedEvent { duration: number; commandName: string; failure: Error; + serviceId?: ObjectId; + /** * Create a failure event * @@ -108,23 +123,29 @@ export class CommandFailedEvent { * @param started - a high resolution tuple timestamp of when the command was first sent, to calculate duration */ constructor( - pool: Connection | ConnectionPool, + connection: Connection, command: WriteProtocolMessageType, error: Error | Document, started: number ) { const cmd = extractCommand(command); const commandName = extractCommandName(cmd); - const { address, connectionId } = extractConnectionDetails(pool); + const { address, connectionId, serviceId } = extractConnectionDetails(connection); this.address = address; this.connectionId = connectionId; + this.serviceId = serviceId; this.requestId = command.requestId; this.commandName = commandName; this.duration = calculateDurationInMs(started); this.failure = maybeRedact(commandName, cmd, error) as Error; } + + /* @internal */ + get hasServiceId(): boolean { + return !!this.serviceId; + } } /** Commands that we want to redact because of the sensitive nature of their contents */ @@ -300,13 +321,14 @@ function extractReply(command: WriteProtocolMessageType, reply?: Document) { return deepCopy(reply.result ? reply.result : reply); } -function extractConnectionDetails(connection: Connection | ConnectionPool) { +function extractConnectionDetails(connection: Connection) { let connectionId; if ('id' in connection) { connectionId = connection.id; } return { address: connection.address, + serviceId: connection.serviceId, connectionId }; } diff --git a/src/cmap/connect.ts b/src/cmap/connect.ts index b33ea781804..6620ade0d41 100644 --- a/src/cmap/connect.ts +++ b/src/cmap/connect.ts @@ -18,10 +18,12 @@ import { MIN_SUPPORTED_SERVER_VERSION } from './wire_protocol/constants'; import type { Document } from '../bson'; +import { ObjectId, Int32 } from '../bson'; import type { Socket, SocketConnectOpts } from 'net'; import type { TLSSocket, ConnectionOptions as TLSConnectionOpts } from 'tls'; -import { Int32 } from '../bson'; + +const FAKE_MONGODB_SERVICE_ID = process.env.FAKE_MONGODB_SERVICE_ID; /** @public */ export type Stream = Socket | TLSSocket; @@ -129,6 +131,25 @@ function performInitialHandshake( return; } + if (options.loadBalanced) { + // TODO: Durran: Remove when server support exists. (NODE-3431) + if (FAKE_MONGODB_SERVICE_ID) { + if (response.topologyVersion) { + response.serviceId = response.topologyVersion.processId; + } else { + response.serviceId = new ObjectId(); + } + } + if (!response.serviceId) { + return callback( + new MongoDriverError( + 'Driver attempted to initialize in load balancing mode, ' + + 'but the server does not support this mode.' + ) + ); + } + } + // NOTE: This is metadata attached to the connection while porting away from // handshake being done in the `Server` class. Likely, it should be // relocated, or at very least restructured. @@ -166,6 +187,7 @@ export interface HandshakeDocument extends Document { client: ClientMetadata; compression: string[]; saslSupportedMechs?: string; + loadBalanced: boolean; } function prepareHandshakeDocument(authContext: AuthContext, callback: Callback) { @@ -177,7 +199,8 @@ function prepareHandshakeDocument(authContext: AuthContext, callback: Callback { static readonly CLOSE = 'close' as const; /** @event */ static readonly MESSAGE = 'message' as const; + /** @event */ + static readonly PINNED = 'pinned' as const; + /** @event */ + static readonly UNPINNED = 'unpinned' as const; constructor(stream: Stream, options: ConnectionOptions) { super(); @@ -247,10 +250,22 @@ export class Connection extends TypedEventEmitter { this[kIsMaster] = response; } + get serviceId(): ObjectId | undefined { + return this.ismaster?.serviceId; + } + + get loadBalanced(): boolean { + return this.description.loadBalanced; + } + get generation(): number { return this[kGeneration] || 0; } + set generation(generation: number) { + this[kGeneration] = generation; + } + get idleTime(): number { return calculateDurationInMs(this[kLastUseTime]); } @@ -306,6 +321,9 @@ export class Connection extends TypedEventEmitter { options = { force: false }; } + this.removeAllListeners(Connection.PINNED); + this.removeAllListeners(Connection.UNPINNED); + options = Object.assign({ force: false }, options); if (this[kStream] == null || this.destroyed) { this.destroyed = true; @@ -351,7 +369,6 @@ export class Connection extends TypedEventEmitter { let clusterTime = this.clusterTime; let finalCmd = Object.assign({}, cmd); - const inTransaction = session && (session.inTransaction() || isTransactionCommand(finalCmd)); if (this.serverApi) { const { version, strict, deprecationErrors } = this.serverApi; @@ -369,18 +386,6 @@ export class Connection extends TypedEventEmitter { clusterTime = session.clusterTime; } - // We need to unpin any read or write commands that happen outside of a pinned - // transaction, so we check if we have a pinned transaction that is no longer - // active, and unpin for all except start or commit. - if ( - !session.transaction.isActive && - session.transaction.isPinned && - !finalCmd.startTransaction && - !finalCmd.commitTransaction - ) { - session.transaction.unpinServer(); - } - const err = applySession(session, finalCmd, options as CommandOptions); if (err) { return callback(err); @@ -416,35 +421,10 @@ export class Connection extends TypedEventEmitter { ? new Msg(cmdNs, finalCmd, commandOptions) : new Query(cmdNs, finalCmd, commandOptions); - const commandResponseHandler = inTransaction - ? (err?: AnyError, ...args: Document[]) => { - // We need to add a TransientTransactionError errorLabel, as stated in the transaction spec. - if ( - err && - err instanceof MongoNetworkError && - !err.hasErrorLabel('TransientTransactionError') - ) { - err.addErrorLabel('TransientTransactionError'); - } - - if ( - session && - !cmd.commitTransaction && - err && - err instanceof MongoError && - err.hasErrorLabel('TransientTransactionError') - ) { - session.transaction.unpinServer(); - } - - return callback(err, ...args); - } - : callback; - try { - write(this, message, commandOptions, commandResponseHandler); + write(this, message, commandOptions, callback); } catch (err) { - commandResponseHandler(err); + callback(err); } } @@ -680,8 +660,10 @@ export class CryptoConnection extends Connection { } } -function hasSessionSupport(conn: Connection) { - return conn.description.logicalSessionTimeoutMinutes != null; +/** @public */ +export function hasSessionSupport(conn: Connection): boolean { + const description = conn.description; + return description.logicalSessionTimeoutMinutes != null || !!description.loadBalanced; } function supportsOpMsg(conn: Connection) { diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index ac5f894d3eb..d8ce1f65fd5 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -1,6 +1,8 @@ import Denque = require('denque'); -import { Logger } from '../logger'; import { APM_EVENTS, Connection, ConnectionEvents, ConnectionOptions } from './connection'; +import type { ObjectId } from 'bson'; +import { Logger } from '../logger'; +import { ConnectionPoolMetrics } from './metrics'; import { connect } from './connect'; import { eachAsync, makeCounter, Callback } from '../utils'; import { MongoDriverError, MongoError } from '../error'; @@ -30,6 +32,8 @@ const kMinPoolSizeTimer = Symbol('minPoolSizeTimer'); /** @internal */ const kGeneration = Symbol('generation'); /** @internal */ +const kServiceGenerations = Symbol('serviceGenerations'); +/** @internal */ const kConnectionCounter = Symbol('connectionCounter'); /** @internal */ const kCancellationToken = Symbol('cancellationToken'); @@ -37,6 +41,12 @@ const kCancellationToken = Symbol('cancellationToken'); const kWaitQueue = Symbol('waitQueue'); /** @internal */ const kCancelled = Symbol('cancelled'); +/** @internal */ +const kMetrics = Symbol('metrics'); +/** @internal */ +const kCheckedOut = Symbol('checkedOut'); +/** @internal */ +const kProcessingWaitQueue = Symbol('processingWaitQueue'); /** @public */ export interface ConnectionPoolOptions extends Omit { @@ -48,6 +58,8 @@ export interface ConnectionPoolOptions extends Omit { * @internal */ [kGeneration]: number; + /** A map of generations to service ids + * @internal + */ + [kServiceGenerations]: Map; /** @internal */ [kConnectionCounter]: Generator; /** @internal */ [kCancellationToken]: CancellationToken; /** @internal */ [kWaitQueue]: Denque; + /** @internal */ + [kMetrics]: ConnectionPoolMetrics; + /** @internal */ + [kCheckedOut]: number; + /** @internal */ + [kProcessingWaitQueue]: boolean; /** * Emitted when the connection pool is created. @@ -184,10 +206,14 @@ export class ConnectionPool extends TypedEventEmitter { this[kPermits] = this.options.maxPoolSize; this[kMinPoolSizeTimer] = undefined; this[kGeneration] = 0; + this[kServiceGenerations] = new Map(); this[kConnectionCounter] = makeCounter(1); this[kCancellationToken] = new CancellationToken(); this[kCancellationToken].setMaxListeners(Infinity); this[kWaitQueue] = new Denque(); + this[kMetrics] = new ConnectionPoolMetrics(); + this[kCheckedOut] = 0; + this[kProcessingWaitQueue] = false; process.nextTick(() => { this.emit(ConnectionPool.CONNECTION_POOL_CREATED, new ConnectionPoolCreatedEvent(this)); @@ -219,6 +245,25 @@ export class ConnectionPool extends TypedEventEmitter { return this[kWaitQueue].length; } + get loadBalanced(): boolean { + return this.options.loadBalanced; + } + + get serviceGenerations(): Map { + return this[kServiceGenerations]; + } + + get currentCheckedOutCount(): number { + return this[kCheckedOut]; + } + + /** + * Get the metrics information for the pool when a wait queue timeout occurs. + */ + private waitQueueErrorMetrics(): string { + return `. maxPoolSize: ${this.options.maxPoolSize}, ${this[kMetrics].info()}`; + } + /** * Check a connection out of this pool. The connection will continue to be tracked, but no reference to it * will be held by the pool. This means that if a connection is checked out it MUST be checked back in or @@ -250,10 +295,16 @@ export class ConnectionPool extends TypedEventEmitter { ConnectionPool.CONNECTION_CHECK_OUT_FAILED, new ConnectionCheckOutFailedEvent(this, 'timeout') ); - waitQueueMember.callback(new WaitQueueTimeoutError(this)); + waitQueueMember.callback( + new WaitQueueTimeoutError( + this.loadBalanced ? this.waitQueueErrorMetrics() : '', + this.address + ) + ); }, waitQueueTimeoutMS); } + this[kCheckedOut] = this[kCheckedOut] + 1; this[kWaitQueue].push(waitQueueMember); process.nextTick(processWaitQueue, this); } @@ -270,9 +321,10 @@ export class ConnectionPool extends TypedEventEmitter { if (!willDestroy) { connection.markAvailable(); - this[kConnections].push(connection); + this[kConnections].unshift(connection); } + this[kCheckedOut] = this[kCheckedOut] - 1; this.emit(ConnectionPool.CONNECTION_CHECKED_IN, new ConnectionCheckedInEvent(this, connection)); if (willDestroy) { @@ -289,9 +341,23 @@ export class ConnectionPool extends TypedEventEmitter { * Pool reset is handled by incrementing the pool's generation count. Any existing connection of a * previous generation will eventually be pruned during subsequent checkouts. */ - clear(): void { - this[kGeneration] += 1; - this.emit('connectionPoolCleared', new ConnectionPoolClearedEvent(this)); + clear(serviceId?: ObjectId): void { + if (this.loadBalanced && serviceId) { + const sid = serviceId.toHexString(); + const generation = this.serviceGenerations.get(sid); + // Only need to worry if the generation exists, since it should + // always be there but typescript needs the check. + if (generation == null) { + throw new MongoDriverError('Service generations are required in load balancer mode.'); + } else { + // Increment the generation for the service id. + this.serviceGenerations.set(sid, generation + 1); + } + } else { + this[kGeneration] += 1; + } + + this.emit('connectionPoolCleared', new ConnectionPoolClearedEvent(this, serviceId)); } /** Close the pool */ @@ -338,7 +404,6 @@ export class ConnectionPool extends TypedEventEmitter { // mark the pool as closed immediately this.closed = true; - eachAsync( this[kConnections].toArray(), (conn, cb) => { @@ -365,7 +430,26 @@ export class ConnectionPool extends TypedEventEmitter { * @param fn - A function which operates on a managed connection * @param callback - The original callback */ - withConnection(fn: WithConnectionCallback, callback?: Callback): void { + withConnection( + conn: Connection | undefined, + fn: WithConnectionCallback, + callback?: Callback + ): void { + if (conn) { + // use the provided connection, and do _not_ check it in after execution + fn(undefined, conn, (fnErr, result) => { + if (typeof callback === 'function') { + if (fnErr) { + callback(fnErr); + } else { + callback(undefined, result); + } + } + }); + + return; + } + this.checkOut((err, conn) => { // don't callback with `err` here, we might want to act upon it inside `fn` fn(err as MongoError, conn, (fnErr, result) => { @@ -399,6 +483,13 @@ function ensureMinPoolSize(pool: ConnectionPool) { } function connectionIsStale(pool: ConnectionPool, connection: Connection) { + const serviceId = connection.serviceId; + if (pool.loadBalanced && serviceId) { + const sid = serviceId.toHexString(); + const generation = pool.serviceGenerations.get(sid); + return connection.generation !== generation; + } + return connection.generation !== pool[kGeneration]; } @@ -437,7 +528,29 @@ function createConnection(pool: ConnectionPool, callback?: Callback) connection.on(event, (e: any) => pool.emit(event, e)); } - pool.emit(ConnectionPool.CONNECTION_POOL_CREATED, new ConnectionCreatedEvent(pool, connection)); + pool.emit(ConnectionPool.CONNECTION_CREATED, new ConnectionCreatedEvent(pool, connection)); + + if (pool.loadBalanced) { + connection.on(Connection.PINNED, pinType => { + pool[kMetrics].markPinned(pinType); + }); + connection.on(Connection.UNPINNED, pinType => { + pool[kMetrics].markUnpinned(pinType); + }); + + const serviceId = connection.serviceId; + if (serviceId) { + let generation; + const sid = serviceId.toHexString(); + if (!pool.serviceGenerations.has(sid)) { + generation = 0; + pool.serviceGenerations.set(sid, generation); + } else { + generation = pool.serviceGenerations.get(sid); + } + connection.generation = generation as number; + } + } connection.markAvailable(); pool.emit(ConnectionPool.CONNECTION_READY, new ConnectionReadyEvent(pool, connection)); @@ -465,10 +578,11 @@ function destroyConnection(pool: ConnectionPool, connection: Connection, reason: } function processWaitQueue(pool: ConnectionPool) { - if (pool.closed) { + if (pool.closed || pool[kProcessingWaitQueue]) { return; } + pool[kProcessingWaitQueue] = true; while (pool.waitQueueSize) { const waitQueueMember = pool[kWaitQueue].peekFront(); if (!waitQueueMember) { @@ -502,11 +616,11 @@ function processWaitQueue(pool: ConnectionPool) { } pool[kWaitQueue].shift(); - return waitQueueMember.callback(undefined, connection); + waitQueueMember.callback(undefined, connection); + } else { + const reason = connection.closed ? 'error' : isStale ? 'stale' : 'idle'; + destroyConnection(pool, connection, reason); } - - const reason = connection.closed ? 'error' : isStale ? 'stale' : 'idle'; - destroyConnection(pool, connection, reason); } const maxPoolSize = pool.options.maxPoolSize; @@ -518,6 +632,7 @@ function processWaitQueue(pool: ConnectionPool) { pool[kConnections].push(connection); } + pool[kProcessingWaitQueue] = false; return; } @@ -537,9 +652,11 @@ function processWaitQueue(pool: ConnectionPool) { clearTimeout(waitQueueMember.timer); } waitQueueMember.callback(err, connection); + pool[kProcessingWaitQueue] = false; + process.nextTick(() => processWaitQueue(pool)); }); - - return; + } else { + pool[kProcessingWaitQueue] = false; } } @@ -565,7 +682,7 @@ export const CMAP_EVENTS = [ * @param callback - A function to call back after connection management is complete */ export type WithConnectionCallback = ( - error: MongoError, + error: MongoError | undefined, connection: Connection | undefined, callback: Callback ) => void; diff --git a/src/cmap/connection_pool_events.ts b/src/cmap/connection_pool_events.ts index 757d213c573..8083f6d2cd7 100644 --- a/src/cmap/connection_pool_events.ts +++ b/src/cmap/connection_pool_events.ts @@ -1,3 +1,4 @@ +import type { ObjectId } from '../bson'; import type { Connection } from './connection'; import type { ConnectionPool, ConnectionPoolOptions } from './connection_pool'; import type { AnyError } from '../error'; @@ -90,12 +91,14 @@ export class ConnectionClosedEvent extends ConnectionPoolMonitoringEvent { connectionId: number | ''; /** The reason the connection was closed */ reason: string; + serviceId?: ObjectId; /** @internal */ constructor(pool: ConnectionPool, connection: Connection, reason: string) { super(pool); this.connectionId = connection.id; this.reason = reason || 'unknown'; + this.serviceId = connection.serviceId; } } @@ -166,7 +169,11 @@ export class ConnectionCheckedInEvent extends ConnectionPoolMonitoringEvent { */ export class ConnectionPoolClearedEvent extends ConnectionPoolMonitoringEvent { /** @internal */ - constructor(pool: ConnectionPool) { + serviceId?: ObjectId; + + /** @internal */ + constructor(pool: ConnectionPool, serviceId?: ObjectId) { super(pool); + this.serviceId = serviceId; } } diff --git a/src/cmap/errors.ts b/src/cmap/errors.ts index d0a0463874c..b3820dab0ec 100644 --- a/src/cmap/errors.ts +++ b/src/cmap/errors.ts @@ -1,5 +1,4 @@ import { MongoDriverError } from '../error'; -import type { Connection } from './connection'; import type { ConnectionPool } from './connection_pool'; /** @@ -28,9 +27,9 @@ export class WaitQueueTimeoutError extends MongoDriverError { /** The address of the connection pool */ address: string; - constructor(pool: Connection | ConnectionPool) { - super('Timed out while checking out a connection from connection pool'); - this.address = pool.address; + constructor(message: string, address: string) { + super(`Timed out while checking out a connection from connection pool${message}`); + this.address = address; } get name(): string { diff --git a/src/cmap/metrics.ts b/src/cmap/metrics.ts new file mode 100644 index 00000000000..cb95ab8302b --- /dev/null +++ b/src/cmap/metrics.ts @@ -0,0 +1,56 @@ +/** @internal */ +export class ConnectionPoolMetrics { + static readonly TXN = 'txn' as const; + static readonly CURSOR = 'cursor' as const; + static readonly OTHER = 'other' as const; + + txnConnections: number; + cursorConnections: number; + otherConnections: number; + + /** + * Create the metrics object. + */ + constructor() { + this.txnConnections = 0; + this.cursorConnections = 0; + this.otherConnections = 0; + } + + /** + * Mark a connection as pinned for a specific operation. + */ + markPinned(pinType: string): void { + if (pinType === ConnectionPoolMetrics.TXN) { + this.txnConnections += 1; + } else if (pinType === ConnectionPoolMetrics.CURSOR) { + this.cursorConnections += 1; + } else { + this.otherConnections += 1; + } + } + + /** + * Unmark a connection as pinned for an operation. + */ + markUnpinned(pinType: string): void { + if (pinType === ConnectionPoolMetrics.TXN) { + this.txnConnections -= 1; + } else if (pinType === ConnectionPoolMetrics.CURSOR) { + this.cursorConnections -= 1; + } else { + this.otherConnections -= 1; + } + } + + /** + * Return information about the cmap metrics as a string. + */ + info(): string { + return ( + `connections in use by cursors: ${this.cursorConnections},` + + `connections in use by transactions: ${this.txnConnections},` + + `connections in use by other operations: ${this.otherConnections}` + ); + } +} diff --git a/src/cmap/stream_description.ts b/src/cmap/stream_description.ts index fe92cb218d9..08bfd6dc4b5 100644 --- a/src/cmap/stream_description.ts +++ b/src/cmap/stream_description.ts @@ -15,6 +15,8 @@ const RESPONSE_FIELDS = [ /** @public */ export interface StreamDescriptionOptions { compressors?: CompressorName[]; + logicalSessionTimeoutMinutes?: number; + loadBalanced: boolean; } /** @public */ @@ -29,6 +31,7 @@ export class StreamDescription { compressors: CompressorName[]; compressor?: CompressorName; logicalSessionTimeoutMinutes?: number; + loadBalanced: boolean; __nodejs_mock_server__?: boolean; @@ -42,6 +45,8 @@ export class StreamDescription { this.maxBsonObjectSize = 16777216; this.maxMessageSizeBytes = 48000000; this.maxWriteBatchSize = 100000; + this.logicalSessionTimeoutMinutes = options?.logicalSessionTimeoutMinutes; + this.loadBalanced = !!options?.loadBalanced; this.compressors = options && options.compressors && Array.isArray(options.compressors) ? options.compressors diff --git a/src/connection_string.ts b/src/connection_string.ts index e4f771543f8..9dfacd1027a 100644 --- a/src/connection_string.ts +++ b/src/connection_string.ts @@ -33,6 +33,13 @@ import { Logger, LoggerLevel } from './logger'; import { PromiseProvider } from './promise_provider'; import { Encrypter } from './encrypter'; +const VALID_TXT_RECORDS = ['authSource', 'replicaSet', 'loadBalanced']; + +const LB_SINGLE_HOST_ERROR = 'loadBalanced option only supported with a single host in the URI'; +const LB_REPLICA_SET_ERROR = 'loadBalanced option not supported with a replicaSet option'; +const LB_DIRECT_CONNECTION_ERROR = + 'loadBalanced option not supported when directConnection is provided'; + /** * Determines whether a provided address matches the provided parent domain in order * to avoid certain attack vectors. @@ -85,6 +92,11 @@ export function resolveSRVRecord(options: MongoOptions, callback: Callback { if (err) { @@ -98,14 +110,15 @@ export function resolveSRVRecord(options: MongoOptions, callback: Callback key !== 'authSource' && key !== 'replicaSet')) { + if (txtRecordOptionKeys.some(key => !VALID_TXT_RECORDS.includes(key))) { return callback( - new MongoParseError('Text record must only set `authSource` or `replicaSet`') + new MongoParseError(`Text record may only set any of: ${VALID_TXT_RECORDS.join(', ')}`) ); } const source = txtRecordOptions.get('authSource') ?? undefined; const replicaSet = txtRecordOptions.get('replicaSet') ?? undefined; + const loadBalanced = txtRecordOptions.get('loadBalanced') ?? undefined; if (source === '' || replicaSet === '') { return callback(new MongoParseError('Cannot have empty URI params in DNS TXT Record')); @@ -118,6 +131,15 @@ export function resolveSRVRecord(options: MongoOptions, callback: Callback 1) { + return new MongoParseError(LB_SINGLE_HOST_ERROR); + } + if (mongoOptions.replicaSet) { + return new MongoParseError(LB_REPLICA_SET_ERROR); + } + if (mongoOptions.directConnection) { + return new MongoParseError(LB_DIRECT_CONNECTION_ERROR); + } + } +} + function setOption( mongoOptions: any, key: string, @@ -670,6 +714,10 @@ export const OPTIONS = { default: 120000, type: 'uint' }, + loadBalanced: { + default: false, + type: 'boolean' + }, localThresholdMS: { default: 15, type: 'uint' diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index e2addc9738a..181e6f69086 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -1,7 +1,7 @@ import { Callback, maybePromise, MongoDBNamespace, ns } from '../utils'; import { Long, Document, BSONSerializeOptions, pluckBSONSerializeOptions } from '../bson'; -import { ClientSession } from '../sessions'; -import { MongoDriverError } from '../error'; +import { ClientSession, maybeClearPinnedConnection } from '../sessions'; +import { AnyError, MongoDriverError, MongoNetworkError } from '../error'; import { ReadPreference, ReadPreferenceLike } from '../read_preference'; import type { Server } from '../sdam/server'; import type { Topology } from '../sdam/topology'; @@ -211,6 +211,10 @@ export abstract class AbstractCursor< return this[kKilled]; } + get loadBalanced(): boolean { + return this[kTopology].loadBalanced; + } + /** Returns current buffered documents length */ bufferedCount(): number { return this[kDocuments].length; @@ -357,7 +361,7 @@ export abstract class AbstractCursor< }); } - close(): void; + close(): Promise; close(callback: Callback): void; close(options: CursorCloseOptions): Promise; close(options: CursorCloseOptions, callback: Callback): void; @@ -368,49 +372,7 @@ export abstract class AbstractCursor< const needsToEmitClosed = !this[kClosed]; this[kClosed] = true; - return maybePromise(callback, done => { - const cursorId = this[kId]; - const cursorNs = this[kNamespace]; - const server = this[kServer]; - const session = this[kSession]; - - if (cursorId == null || server == null || cursorId.isZero() || cursorNs == null) { - if (needsToEmitClosed) { - this[kId] = Long.ZERO; - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-expect-error - this.emit(AbstractCursor.CLOSE); - } - - if (session && session.owner === this) { - return session.endSession(done); - } - - return done(); - } - - this[kKilled] = true; - server.killCursors( - cursorNs, - [cursorId], - { ...pluckBSONSerializeOptions(this[kOptions]), session }, - () => { - if (session && session.owner === this) { - return session.endSession(() => { - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-expect-error - this.emit(AbstractCursor.CLOSE); - done(); - }); - } - - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-expect-error - this.emit(AbstractCursor.CLOSE); - done(); - } - ); - }); + return maybePromise(callback, done => cleanupCursor(this, { needsToEmitClosed }, done)); } /** @@ -714,7 +676,7 @@ function next(cursor: AbstractCursor, blocking: boolean, callback: Callback callback(err, nextDocument(cursor))); + return cleanupCursor(cursor, { error: err }, () => callback(err, nextDocument(cursor))); } next(cursor, blocking, callback); @@ -724,7 +686,7 @@ function next(cursor: AbstractCursor, blocking: boolean, callback: Callback callback(undefined, null)); + return cleanupCursor(cursor, undefined, () => callback(undefined, null)); } // otherwise need to call getMore @@ -741,7 +703,7 @@ function next(cursor: AbstractCursor, blocking: boolean, callback: Callback callback(err, nextDocument(cursor))); + return cleanupCursor(cursor, { error: err }, () => callback(err, nextDocument(cursor))); } if (cursor[kDocuments].length === 0 && blocking === false) { @@ -757,18 +719,71 @@ function cursorIsDead(cursor: AbstractCursor): boolean { return !!cursorId && cursorId.isZero(); } -function cleanupCursor(cursor: AbstractCursor, callback: Callback): void { - if (cursor[kDocuments].length === 0) { - cursor[kClosed] = true; - cursor.emit(AbstractCursor.CLOSE); +function cleanupCursor( + cursor: AbstractCursor, + options: { error?: AnyError | undefined; needsToEmitClosed?: boolean } | undefined, + callback: Callback +): void { + const cursorId = cursor[kId]; + const cursorNs = cursor[kNamespace]; + const server = cursor[kServer]; + const session = cursor[kSession]; + const error = options?.error; + const needsToEmitClosed = + options?.needsToEmitClosed || + (options?.needsToEmitClosed == null && cursor[kDocuments].length === 0); + + if (error) { + if (cursor.loadBalanced && error instanceof MongoNetworkError) { + return completeCleanup(); + } } - const session = cursor[kSession]; - if (session && session.owner === cursor) { - session.endSession(callback); - } else { - callback(); + if (cursorId == null || server == null || cursorId.isZero() || cursorNs == null) { + if (needsToEmitClosed) { + cursor[kClosed] = true; + cursor[kId] = Long.ZERO; + cursor.emit(AbstractCursor.CLOSE); + } + + if (session) { + if (session.owner === cursor) { + return session.endSession({ error }, callback); + } + + if (!session.inTransaction()) { + maybeClearPinnedConnection(session, { error }); + } + } + + return callback(); } + + function completeCleanup() { + if (session) { + if (session.owner === cursor) { + return session.endSession({ error }, () => { + cursor.emit(AbstractCursor.CLOSE); + callback(); + }); + } + + if (!session.inTransaction()) { + maybeClearPinnedConnection(session, { error }); + } + } + + cursor.emit(AbstractCursor.CLOSE); + return callback(); + } + + cursor[kKilled] = true; + server.killCursors( + cursorNs, + [cursorId], + { ...pluckBSONSerializeOptions(cursor[kOptions]), session }, + () => completeCleanup() + ); } /** @internal */ diff --git a/src/error.ts b/src/error.ts index c5112d54a95..6688421f21e 100644 --- a/src/error.ts +++ b/src/error.ts @@ -537,6 +537,12 @@ export function isNetworkErrorBeforeHandshake(err: MongoNetworkError): boolean { return err[kBeforeHandshake] === true; } +/** @public */ +export interface MongoNetworkErrorOptions { + /** Indicates the timeout happened before a connection handshake completed */ + beforeHandshake: boolean; +} + /** * An error indicating an issue with the network, including TCP errors and timeouts. * @public @@ -546,12 +552,15 @@ export class MongoNetworkError extends MongoError { /** @internal */ [kBeforeHandshake]?: boolean; - constructor(message: string | Error, options?: { beforeHandshake?: boolean }) { + constructor(message: string | Error, options?: MongoNetworkErrorOptions) { super(message); if (options && typeof options.beforeHandshake === 'boolean') { this[kBeforeHandshake] = options.beforeHandshake; } + + // TODO: must not apply when running `commitTransaction` + this.addErrorLabel('TransientTransactionError'); } get name(): string { @@ -559,12 +568,6 @@ export class MongoNetworkError extends MongoError { } } -/** @public */ -export interface MongoNetworkTimeoutErrorOptions { - /** Indicates the timeout happened before a connection handshake completed */ - beforeHandshake: boolean; -} - /** * An error indicating a network timeout occurred * @public @@ -574,7 +577,7 @@ export interface MongoNetworkTimeoutErrorOptions { * CSFLE has a dependency on this error with an instanceof check */ export class MongoNetworkTimeoutError extends MongoNetworkError { - constructor(message: string, options?: MongoNetworkTimeoutErrorOptions) { + constructor(message: string, options?: MongoNetworkErrorOptions) { super(message, options); } diff --git a/src/index.ts b/src/index.ts index d09c7f80f88..0c670301986 100644 --- a/src/index.ts +++ b/src/index.ts @@ -161,6 +161,7 @@ export type { GetMoreOptions, ConnectionEvents } from './cmap/connection'; +export type { ConnectionPoolMetrics } from './cmap/metrics'; export type { CloseOptions, ConnectionPoolOptions, @@ -187,7 +188,7 @@ export type { } from './cursor/abstract_cursor'; export type { DbPrivate, DbOptions } from './db'; export type { AutoEncryptionOptions, AutoEncrypter } from './deps'; -export type { AnyError, ErrorDescription, MongoNetworkTimeoutErrorOptions } from './error'; +export type { AnyError, ErrorDescription, MongoNetworkErrorOptions } from './error'; export type { Explain, ExplainOptions, ExplainVerbosityLike } from './explain'; export type { GridFSBucketReadStream, @@ -334,6 +335,7 @@ export type { ClientSession, ClientSessionEvents, ClientSessionOptions, + EndSessionOptions, ServerSessionPool, ServerSession, ServerSessionId, diff --git a/src/mongo_client.ts b/src/mongo_client.ts index 4053584c62a..f0b2579660e 100644 --- a/src/mongo_client.ts +++ b/src/mongo_client.ts @@ -174,6 +174,8 @@ export interface MongoClientOptions extends BSONSerializeOptions, SupportedNodeC retryWrites?: boolean; /** Allow a driver to force a Single topology type with a connection string containing one host */ directConnection?: boolean; + /** Instruct the driver it is connecting to a load balancer fronting a mongos like service */ + loadBalanced?: boolean; /** The write concern w value */ w?: W; @@ -661,6 +663,7 @@ export interface MongoOptions credentials?: MongoCredentials; readPreference: ReadPreference; readConcern: ReadConcern; + loadBalanced: boolean; serverApi: ServerApi; writeConcern: WriteConcern; dbName: string; diff --git a/src/operations/command.ts b/src/operations/command.ts index 9945ba0aa6b..1a5a11d319d 100644 --- a/src/operations/command.ts +++ b/src/operations/command.ts @@ -47,6 +47,9 @@ export interface CommandOperationOptions dbName?: string; authdb?: string; noResponse?: boolean; + + /** @internal Hints to `executeOperation` that this operation should not unpin on an ended transaction */ + bypassPinningCheck?: boolean; } /** @internal */ @@ -109,6 +112,10 @@ export abstract class CommandOperation extends AbstractOperation { return true; } + get bypassPinningCheck(): boolean { + return this.options.bypassPinningCheck || false; + } + abstract execute(server: Server, session: ClientSession, callback: Callback): void; executeCommand(server: Server, session: ClientSession, cmd: Document, callback: Callback): void { diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index c6baa30c072..657f5892da2 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -136,6 +136,15 @@ function executeWithServerSelection( return; } + if ( + session && + session.isPinned && + session.transaction.isCommitted && + !operation.bypassPinningCheck + ) { + session.unpin(); + } + const serverSelectionOptions = { session }; function callbackWithRetry(err?: any, result?: any) { if (err == null) { @@ -241,8 +250,9 @@ function shouldRetryWrite(err: any) { function supportsRetryableWrites(server: Server) { return ( - server.description.maxWireVersion >= 6 && - server.description.logicalSessionTimeoutMinutes && - server.description.type !== ServerType.Standalone + server.loadBalanced || + (server.description.maxWireVersion >= 6 && + server.description.logicalSessionTimeoutMinutes && + server.description.type !== ServerType.Standalone) ); } diff --git a/src/operations/operation.ts b/src/operations/operation.ts index c9259857e3f..01be8b43d4c 100644 --- a/src/operations/operation.ts +++ b/src/operations/operation.ts @@ -91,6 +91,10 @@ export abstract class AbstractOperation { get canRetryWrite(): boolean { return true; } + + get bypassPinningCheck(): boolean { + return false; + } } export function defineAspects( diff --git a/src/sdam/common.ts b/src/sdam/common.ts index 80ff92fa13b..1e00ca93641 100644 --- a/src/sdam/common.ts +++ b/src/sdam/common.ts @@ -17,7 +17,8 @@ export const TopologyType = Object.freeze({ ReplicaSetNoPrimary: 'ReplicaSetNoPrimary', ReplicaSetWithPrimary: 'ReplicaSetWithPrimary', Sharded: 'Sharded', - Unknown: 'Unknown' + Unknown: 'Unknown', + LoadBalanced: 'LoadBalanced' } as const); /** @public */ @@ -36,7 +37,8 @@ export const ServerType = Object.freeze({ RSArbiter: 'RSArbiter', RSOther: 'RSOther', RSGhost: 'RSGhost', - Unknown: 'Unknown' + Unknown: 'Unknown', + LoadBalancer: 'LoadBalancer' } as const); /** @public */ diff --git a/src/sdam/server.ts b/src/sdam/server.ts index 76acdcd4613..77e008de0fe 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -23,7 +23,8 @@ import { STATE_CLOSING, STATE_CONNECTING, STATE_CONNECTED, - ClusterTime + ClusterTime, + TopologyType } from './common'; import { MongoError, @@ -43,6 +44,7 @@ import { CommandOptions, APM_EVENTS } from '../cmap/connection'; +import type { Long } from '../bson'; import type { Topology } from './topology'; import type { ServerHeartbeatFailedEvent, @@ -50,7 +52,7 @@ import type { ServerHeartbeatSucceededEvent } from './events'; import type { ClientSession } from '../sessions'; -import type { Document, Long } from '../bson'; +import type { Document } from '../bson'; import type { AutoEncrypter } from '../deps'; import type { ServerApi } from '../mongo_client'; import { TypedEventEmitter } from '../mongo_types'; @@ -151,6 +153,8 @@ export class Server extends TypedEventEmitter { this.clusterTime = clusterTime; }); + if (this.loadBalanced) return; + // create the monitor this[kMonitor] = new Monitor(this, this.s.options); @@ -192,6 +196,10 @@ export class Server extends TypedEventEmitter { } } + get loadBalanced(): boolean { + return this.s.topology.description.type === TopologyType.LoadBalanced; + } + /** * Initiate server connect */ @@ -201,7 +209,16 @@ export class Server extends TypedEventEmitter { } stateTransition(this, STATE_CONNECTING); - this[kMonitor].connect(); + + // If in load balancer mode we automatically set the server to + // a load balancer. It never transitions out of this state and + // has no monitor. + if (!this.loadBalanced) { + this[kMonitor].connect(); + } else { + stateTransition(this, STATE_CONNECTED); + this.emit(Server.CONNECT, this); + } } /** Destroy the server connection */ @@ -219,7 +236,10 @@ export class Server extends TypedEventEmitter { stateTransition(this, STATE_CLOSING); - this[kMonitor].close(); + if (!this.loadBalanced) { + this[kMonitor].close(); + } + this.s.pool.close(options, err => { stateTransition(this, STATE_CLOSED); this.emit('closed'); @@ -234,7 +254,9 @@ export class Server extends TypedEventEmitter { * this will be a no-op. */ requestCheck(): void { - this[kMonitor].requestCheck(); + if (!this.loadBalanced) { + this[kMonitor].requestCheck(); + } } /** @@ -281,19 +303,43 @@ export class Server extends TypedEventEmitter { return; } - this.s.pool.withConnection((err, conn, cb) => { - if (err || !conn) { - markServerUnknown(this, err); - return cb(err); - } + const session = finalOptions.session; + const conn = session?.pinnedConnection; + + // NOTE: This is a hack! We can't retrieve the connections used for executing an operation + // (and prevent them from being checked back in) at the point of operation execution. + // This should be considered as part of the work for NODE-2882 + if (session && conn == null && this.loadBalanced && isPinnableCommand(cmd, session)) { + this.s.pool.checkOut((err, checkedOut) => { + if (err || checkedOut == null) { + if (callback) return callback(err); + return; + } - conn.command( - ns, - cmd, - finalOptions, - makeOperationHandler(this, conn, cmd, finalOptions, cb) as Callback - ); - }, callback); + session.pin(checkedOut); + this.command(ns, cmd, finalOptions, callback as Callback); + }); + + return; + } + + this.s.pool.withConnection( + conn, + (err, conn, cb) => { + if (err || !conn) { + markServerUnknown(this, err); + return cb(err); + } + + conn.command( + ns, + cmd, + finalOptions, + makeOperationHandler(this, conn, cmd, finalOptions, cb) as Callback + ); + }, + callback + ); } /** @@ -306,14 +352,23 @@ export class Server extends TypedEventEmitter { return; } - this.s.pool.withConnection((err, conn, cb) => { - if (err || !conn) { - markServerUnknown(this, err); - return cb(err); - } + this.s.pool.withConnection( + undefined, + (err, conn, cb) => { + if (err || !conn) { + markServerUnknown(this, err); + return cb(err); + } - conn.query(ns, cmd, options, makeOperationHandler(this, conn, cmd, options, cb) as Callback); - }, callback); + conn.query( + ns, + cmd, + options, + makeOperationHandler(this, conn, cmd, options, cb) as Callback + ); + }, + callback + ); } /** @@ -331,19 +386,24 @@ export class Server extends TypedEventEmitter { return; } - this.s.pool.withConnection((err, conn, cb) => { - if (err || !conn) { - markServerUnknown(this, err); - return cb(err); - } + const conn = options.session?.pinnedConnection; + this.s.pool.withConnection( + conn, + (err, conn, cb) => { + if (err || !conn) { + markServerUnknown(this, err); + return cb(err); + } - conn.getMore( - ns, - cursorId, - options, - makeOperationHandler(this, conn, {}, options, cb) as Callback - ); - }, callback); + conn.getMore( + ns, + cursorId, + options, + makeOperationHandler(this, conn, {}, options, cb) as Callback + ); + }, + callback + ); } /** @@ -364,19 +424,24 @@ export class Server extends TypedEventEmitter { return; } - this.s.pool.withConnection((err, conn, cb) => { - if (err || !conn) { - markServerUnknown(this, err); - return cb(err); - } + const conn = options.session?.pinnedConnection; + this.s.pool.withConnection( + conn, + (err, conn, cb) => { + if (err || !conn) { + markServerUnknown(this, err); + return cb(err); + } - conn.killCursors( - ns, - cursorIds, - options, - makeOperationHandler(this, conn, {}, undefined, cb) as Callback - ); - }, callback); + conn.killCursors( + ns, + cursorIds, + options, + makeOperationHandler(this, conn, {}, undefined, cb) as Callback + ); + }, + callback + ); } } @@ -397,9 +462,10 @@ Object.defineProperty(Server.prototype, 'clusterTime', { function supportsRetryableWrites(server: Server) { return ( - server.description.maxWireVersion >= 6 && - server.description.logicalSessionTimeoutMinutes && - server.description.type !== ServerType.Standalone + server.loadBalanced || + (server.description.maxWireVersion >= 6 && + server.description.logicalSessionTimeoutMinutes && + server.description.type !== ServerType.Standalone) ); } @@ -413,6 +479,11 @@ function calculateRoundTripTime(oldRtt: number, duration: number): number { } function markServerUnknown(server: Server, error?: MongoError) { + // Load balancer servers can never be marked unknown. + if (server.loadBalanced) { + return; + } + if (error instanceof MongoNetworkError && !(error instanceof MongoNetworkTimeoutError)) { server[kMonitor].reset(); } @@ -427,7 +498,28 @@ function markServerUnknown(server: Server, error?: MongoError) { ); } +function isPinnableCommand(cmd: Document, session?: ClientSession): boolean { + if (session) { + return ( + session.inTransaction() || + 'aggregate' in cmd || + 'find' in cmd || + 'getMore' in cmd || + 'listCollections' in cmd || + 'listIndexes' in cmd + ); + } + + return false; +} + function connectionIsStale(pool: ConnectionPool, connection: Connection) { + if (connection.serviceId) { + return ( + connection.generation !== pool.serviceGenerations.get(connection.serviceId.toHexString()) + ); + } + return connection.generation !== pool.generation; } @@ -471,8 +563,13 @@ function makeOperationHandler( } if (!(err instanceof MongoNetworkTimeoutError) || isNetworkErrorBeforeHandshake(err)) { - markServerUnknown(server, err); - server.s.pool.clear(); + // In load balanced mode we never mark the server as unknown and always + // clear for the specific service id. + + server.s.pool.clear(connection.serviceId); + if (!server.loadBalanced) { + markServerUnknown(server, err); + } } } else { // if pre-4.4 server, then add error label if its a retryable write error @@ -488,14 +585,20 @@ function makeOperationHandler( if (isSDAMUnrecoverableError(err)) { if (shouldHandleStateChangeError(server, err)) { if (maxWireVersion(server) <= 7 || isNodeShuttingDownError(err)) { - server.s.pool.clear(); + server.s.pool.clear(connection.serviceId); } - markServerUnknown(server, err); - process.nextTick(() => server.requestCheck()); + if (!server.loadBalanced) { + markServerUnknown(server, err); + process.nextTick(() => server.requestCheck()); + } } } } + + if (session && session.isPinned && err.hasErrorLabel('TransientTransactionError')) { + session.unpin({ force: true }); + } } callback(err, result); diff --git a/src/sdam/server_description.ts b/src/sdam/server_description.ts index 76f8ccff082..bd907b2c4b3 100644 --- a/src/sdam/server_description.ts +++ b/src/sdam/server_description.ts @@ -7,14 +7,16 @@ import type { MongoError } from '../error'; const WRITABLE_SERVER_TYPES = new Set([ ServerType.RSPrimary, ServerType.Standalone, - ServerType.Mongos + ServerType.Mongos, + ServerType.LoadBalancer ]); const DATA_BEARING_SERVER_TYPES = new Set([ ServerType.RSPrimary, ServerType.RSSecondary, ServerType.Mongos, - ServerType.Standalone + ServerType.Standalone, + ServerType.LoadBalancer ]); /** @public */ @@ -36,6 +38,9 @@ export interface ServerDescriptionOptions { /** The topologyVersion */ topologyVersion?: TopologyVersion; + + /** If the client is in load balancing mode. */ + loadBalanced?: boolean; } /** @@ -90,7 +95,7 @@ export class ServerDescription { this._hostAddress = address; this.address = this._hostAddress.toString(); } - this.type = parseServerType(ismaster); + this.type = parseServerType(ismaster, options); this.hosts = ismaster?.hosts?.map((host: string) => host.toLowerCase()) ?? []; this.passives = ismaster?.passives?.map((host: string) => host.toLowerCase()) ?? []; this.arbiters = ismaster?.arbiters?.map((host: string) => host.toLowerCase()) ?? []; @@ -206,7 +211,14 @@ export class ServerDescription { } // Parses an `ismaster` message and determines the server type -export function parseServerType(ismaster?: Document): ServerType { +export function parseServerType( + ismaster?: Document, + options?: ServerDescriptionOptions +): ServerType { + if (options?.loadBalanced) { + return ServerType.LoadBalancer; + } + if (!ismaster || !ismaster.ok) { return ServerType.Unknown; } diff --git a/src/sdam/server_selection.ts b/src/sdam/server_selection.ts index 9f85c7686db..dfa9beca44d 100644 --- a/src/sdam/server_selection.ts +++ b/src/sdam/server_selection.ts @@ -207,6 +207,10 @@ function knownFilter(server: ServerDescription): boolean { return server.type !== ServerType.Unknown; } +function loadBalancerFilter(server: ServerDescription): boolean { + return server.type === ServerType.LoadBalancer; +} + /** * Returns a function which selects servers based on a provided read preference * @@ -232,6 +236,10 @@ export function readPreferenceServerSelector(readPreference: ReadPreference): Se ); } + if (topologyDescription.type === TopologyType.LoadBalanced) { + return servers.filter(loadBalancerFilter); + } + if (topologyDescription.type === TopologyType.Unknown) { return []; } diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index 5f6171acf03..f2f4ab03db6 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -146,6 +146,7 @@ export interface TopologyOptions extends BSONSerializeOptions, ServerOptions { srvPoller?: SrvPoller; /** Indicates that a client should directly connect to a node without attempting to discover its topology type */ directConnection: boolean; + loadBalanced: boolean; metadata: ClientMetadata; /** MongoDB server API version */ serverApi?: ServerApi; @@ -248,6 +249,7 @@ export class Topology extends TypedEventEmitter { retryWrites: DEFAULT_OPTIONS.get('retryWrites'), serverSelectionTimeoutMS: DEFAULT_OPTIONS.get('serverSelectionTimeoutMS'), directConnection: DEFAULT_OPTIONS.get('directConnection'), + loadBalanced: DEFAULT_OPTIONS.get('loadBalanced'), metadata: DEFAULT_OPTIONS.get('metadata'), monitorCommands: DEFAULT_OPTIONS.get('monitorCommands'), tls: DEFAULT_OPTIONS.get('tls'), @@ -325,7 +327,7 @@ export class Topology extends TypedEventEmitter { detectSrvRecords: ev => this.detectSrvRecords(ev) }; - if (options.srvHost) { + if (options.srvHost && !options.loadBalanced) { this.s.srvPoller = options.srvPoller ?? new SrvPoller({ @@ -379,6 +381,10 @@ export class Topology extends TypedEventEmitter { return this.s.description; } + get loadBalanced(): boolean { + return this.s.options.loadBalanced; + } + get capabilities(): ServerCapabilities { return new ServerCapabilities(this.lastIsMaster()); } @@ -411,7 +417,19 @@ export class Topology extends TypedEventEmitter { ); // connect all known servers, then attempt server selection to connect - connectServers(this, Array.from(this.s.description.servers.values())); + const serverDescriptions = Array.from(this.s.description.servers.values()); + connectServers(this, serverDescriptions); + + // In load balancer mode we need to fake a server description getting + // emitted from the monitor, since the monitor doesn't exist. + if (this.s.options.loadBalanced) { + serverDescriptions.forEach(description => { + const newDescription = new ServerDescription(description.hostAddress, undefined, { + loadBalanced: this.s.options.loadBalanced + }); + this.serverUpdateHandler(newDescription); + }); + } const readPreference = options.readPreference ?? ReadPreference.primary; this.selectServer(readPreferenceServerSelector(readPreference), options, (err, server) => { @@ -610,7 +628,7 @@ export class Topology extends TypedEventEmitter { * @returns Whether sessions are supported on the current topology */ hasSessionSupport(): boolean { - return this.description.logicalSessionTimeoutMinutes != null; + return this.loadBalanced || this.description.logicalSessionTimeoutMinutes != null; } /** Start a logical session */ @@ -830,6 +848,10 @@ function topologyTypeFromOptions(options?: TopologyOptions) { return TopologyType.ReplicaSetNoPrimary; } + if (options?.loadBalanced) { + return TopologyType.LoadBalanced; + } + return TopologyType.Unknown; } diff --git a/src/sdam/topology_description.ts b/src/sdam/topology_description.ts index 4e9abdbf1a4..e08d9e0d121 100644 --- a/src/sdam/topology_description.ts +++ b/src/sdam/topology_description.ts @@ -85,7 +85,13 @@ export class TopologyDescription { // determine server compatibility for (const serverDescription of this.servers.values()) { - if (serverDescription.type === ServerType.Unknown) continue; + // Load balancer mode is always compatible. + if ( + serverDescription.type === ServerType.Unknown || + serverDescription.type === ServerType.LoadBalancer + ) { + continue; + } if (serverDescription.minWireVersion > MAX_SUPPORTED_WIRE_VERSION) { this.compatible = false; diff --git a/src/sessions.ts b/src/sessions.ts index 2b5324061f3..8f3f1ce916d 100644 --- a/src/sessions.ts +++ b/src/sessions.ts @@ -2,7 +2,7 @@ import { PromiseProvider } from './promise_provider'; import { Binary, Long, Timestamp, Document } from './bson'; import { ReadPreference } from './read_preference'; import { isTransactionCommand, TxnState, Transaction, TransactionOptions } from './transactions'; -import { resolveClusterTime, ClusterTime } from './sdam/common'; +import { resolveClusterTime, ClusterTime, TopologyType } from './sdam/common'; import { isSharded } from './cmap/wire_protocol/shared'; import { MongoError, @@ -11,7 +11,8 @@ import { MongoWriteConcernError, MONGODB_ERROR_CODES, MongoDriverError, - MongoServerError + MongoServerError, + AnyError } from './error'; import { now, @@ -28,6 +29,8 @@ import { executeOperation } from './operations/execute_operation'; import { RunAdminCommandOperation } from './operations/run_command'; import type { AbstractCursor } from './cursor/abstract_cursor'; import type { CommandOptions } from './cmap/connection'; +import { Connection } from './cmap/connection'; +import { ConnectionPoolMetrics } from './cmap/metrics'; import type { WriteConcern } from './write_concern'; import { TypedEventEmitter } from './mongo_types'; import { ReadConcernLevel } from './read_concern'; @@ -79,6 +82,18 @@ const kServerSession = Symbol('serverSession'); const kSnapshotTime = Symbol('snapshotTime'); /** @internal */ const kSnapshotEnabled = Symbol('snapshotEnabled'); +/** @internal */ +const kPinnedConnection = Symbol('pinnedConnection'); + +/** @public */ +export interface EndSessionOptions { + /** + * An optional error which caused the call to end this session + * @internal + */ + error?: AnyError; + force?: boolean; +} /** * A class representing a client session on the server @@ -107,6 +122,8 @@ export class ClientSession extends TypedEventEmitter { [kSnapshotTime]?: Timestamp; /** @internal */ [kSnapshotEnabled] = false; + /** @internal */ + [kPinnedConnection]?: Connection; /** * Create a client session. @@ -181,6 +198,41 @@ export class ClientSession extends TypedEventEmitter { return this[kSnapshotEnabled]; } + get loadBalanced(): boolean { + return this.topology.description.type === TopologyType.LoadBalanced; + } + + /** @internal */ + get pinnedConnection(): Connection | undefined { + return this[kPinnedConnection]; + } + + /** @internal */ + pin(conn: Connection): void { + if (this[kPinnedConnection]) { + throw TypeError('Cant pin multiple connections to the same session'); + } + + this[kPinnedConnection] = conn; + conn.emit( + Connection.PINNED, + this.inTransaction() ? ConnectionPoolMetrics.TXN : ConnectionPoolMetrics.CURSOR + ); + } + + /** @internal */ + unpin(options?: { force?: boolean; error?: AnyError }): void { + if (this.loadBalanced) { + return maybeClearPinnedConnection(this, options); + } + + this.transaction.unpinServer(); + } + + get isPinned(): boolean { + return this.loadBalanced ? !!this[kPinnedConnection] : this.transaction.isPinned; + } + /** * Ends this session on the server * @@ -189,21 +241,25 @@ export class ClientSession extends TypedEventEmitter { */ endSession(): Promise; endSession(callback: Callback): void; - endSession(options: Record): Promise; - endSession(options: Record, callback: Callback): void; + endSession(options: EndSessionOptions): Promise; + endSession(options: EndSessionOptions, callback: Callback): void; endSession( - options?: Record | Callback, + options?: EndSessionOptions | Callback, callback?: Callback ): void | Promise { if (typeof options === 'function') (callback = options), (options = {}); - options = options ?? {}; + const _options = options ?? ({} as EndSessionOptions); + if (_options.force == null) _options.force = true; return maybePromise(callback, done => { if (this.hasEnded) { + maybeClearPinnedConnection(this, _options); return done(); } const completeEndSession = () => { + maybeClearPinnedConnection(this, _options); + // release the server session back to the pool this.sessionPool.release(this.serverSession); this[kServerSession] = undefined; @@ -290,6 +346,10 @@ export class ClientSession extends TypedEventEmitter { throw new MongoDriverError('Transaction already in progress'); } + if (this.isPinned && this.transaction.isCommitted) { + this.unpin(); + } + const topologyMaxWireVersion = maxWireVersion(this.topology); if ( isSharded(this.topology) && @@ -397,6 +457,49 @@ function isUnknownTransactionCommitResult(err: MongoError) { ); } +export function maybeClearPinnedConnection( + session: ClientSession, + options?: EndSessionOptions +): void { + // unpin a connection if it has been pinned + const conn = session[kPinnedConnection]; + const error = options?.error; + + if ( + session.inTransaction() && + error && + error instanceof MongoError && + error.hasErrorLabel('TransientTransactionError') + ) { + return; + } + + // NOTE: the spec talks about what to do on a network error only, but the tests seem to + // to validate that we don't unpin on _all_ errors? + if (conn && (options?.error == null || options?.force)) { + const servers = Array.from(session.topology.s.servers.values()); + if (servers.length === 0) { + // This can happen if the client is closed when the connection is still pinned + // NOTE: we don't usually do this, we could instead throw an error? + conn.destroy(); + + session[kPinnedConnection] = undefined; + return; + } + + const loadBalancer = servers[0]; + loadBalancer.s.pool.checkIn(conn); + conn.emit( + Connection.UNPINNED, + session.transaction.state !== TxnState.NO_TRANSACTION + ? ConnectionPoolMetrics.TXN + : ConnectionPoolMetrics.CURSOR + ); + + session[kPinnedConnection] = undefined; + } +} + function isMaxTimeMSExpiredError(err: MongoError) { if (err == null || !(err instanceof MongoServerError)) { return false; @@ -579,6 +682,10 @@ function endTransaction(session: ClientSession, commandName: string, callback: C function commandHandler(e?: MongoError, r?: Document) { if (commandName !== 'commitTransaction') { session.transaction.transition(TxnState.TRANSACTION_ABORTED); + if (session.loadBalanced) { + maybeClearPinnedConnection(session, { force: false }); + } + // The spec indicates that we should ignore all errors on `abortTransaction` return callback(); } @@ -595,12 +702,13 @@ function endTransaction(session: ClientSession, commandName: string, callback: C e.addErrorLabel('UnknownTransactionCommitResult'); // per txns spec, must unpin session in this case - session.transaction.unpinServer(); + session.unpin({ error: e }); } } else if (e.hasErrorLabel('TransientTransactionError')) { - session.transaction.unpinServer(); + session.unpin({ error: e }); } } + callback(e, r); } @@ -614,14 +722,20 @@ function endTransaction(session: ClientSession, commandName: string, callback: C session.topology, new RunAdminCommandOperation(undefined, command, { session, - readPreference: ReadPreference.primary + readPreference: ReadPreference.primary, + bypassPinningCheck: true }), (err, reply) => { + if (command.abortTransaction) { + // always unpin on abort regardless of command outcome + session.unpin(); + } + if (err && isRetryableError(err as MongoError)) { // SPEC-1185: apply majority write concern when retrying commitTransaction if (command.commitTransaction) { // per txns spec, must unpin session in this case - session.transaction.unpinServer(); + session.unpin({ force: true }); command.writeConcern = Object.assign({ wtimeout: 10000 }, command.writeConcern, { w: 'majority' @@ -632,7 +746,8 @@ function endTransaction(session: ClientSession, commandName: string, callback: C session.topology, new RunAdminCommandOperation(undefined, command, { session, - readPreference: ReadPreference.primary + readPreference: ReadPreference.primary, + bypassPinningCheck: true }), (_err, _reply) => commandHandler(_err as MongoError, _reply) ); @@ -731,7 +846,7 @@ export class ServerSessionPool { while (this.sessions.length) { const session = this.sessions.shift(); - if (session && !session.hasTimedOut(sessionTimeoutMinutes)) { + if (session && (this.topology.loadBalanced || !session.hasTimedOut(sessionTimeoutMinutes))) { return session; } } @@ -748,6 +863,11 @@ export class ServerSessionPool { */ release(session: ServerSession): void { const sessionTimeoutMinutes = this.topology.logicalSessionTimeoutMinutes; + + if (this.topology.loadBalanced) { + this.sessions.unshift(session); + } + if (!sessionTimeoutMinutes) { return; } diff --git a/src/transactions.ts b/src/transactions.ts index 95c05da415c..f43169a808c 100644 --- a/src/transactions.ts +++ b/src/transactions.ts @@ -77,7 +77,6 @@ export class Transaction { /** Create a transaction @internal */ constructor(options?: TransactionOptions) { options = options ?? {}; - this.state = TxnState.NO_TRANSACTION; this.options = {}; @@ -133,9 +132,19 @@ export class Transaction { TxnState.STARTING_TRANSACTION, TxnState.TRANSACTION_IN_PROGRESS ]; + return activeStates.includes(this.state); } + get isCommitted(): boolean { + const committedStates: TxnState[] = [ + TxnState.TRANSACTION_COMMITTED, + TxnState.TRANSACTION_COMMITTED_EMPTY, + TxnState.TRANSACTION_ABORTED + ]; + + return committedStates.includes(this.state); + } /** * Transition the transaction in the state machine * @internal diff --git a/src/utils.ts b/src/utils.ts index 562818e0db5..2362675bb6e 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -18,6 +18,7 @@ import type { MongoClient } from './mongo_client'; import type { CommandOperationOptions, OperationParent } from './operations/command'; import { ReadPreference } from './read_preference'; import { URL } from 'url'; +import { MAX_SUPPORTED_WIRE_VERSION } from './cmap/wire_protocol/constants'; /** * MongoDB Driver style callback @@ -663,6 +664,9 @@ export function uuidV4(): Buffer { */ export function maxWireVersion(topologyOrServer?: Connection | Topology | Server): number { if (topologyOrServer) { + if (topologyOrServer.loadBalanced) { + return MAX_SUPPORTED_WIRE_VERSION; + } if (topologyOrServer.ismaster) { return topologyOrServer.ismaster.maxWireVersion; } diff --git a/test/functional/change_stream_spec.test.js b/test/functional/change_stream_spec.test.js index d124f46c82b..713a65158a2 100644 --- a/test/functional/change_stream_spec.test.js +++ b/test/functional/change_stream_spec.test.js @@ -2,10 +2,11 @@ const path = require('path'); const chai = require('chai'); -const loadSpecTests = require('../spec').loadSpecTests; +const { loadSpecTests } = require('../spec'); +const { runUnifiedTest } = require('./unified-spec-runner/runner'); const camelCase = require('lodash.camelcase'); -const setupDatabase = require('./shared').setupDatabase; -const delay = require('./shared').delay; +const { setupDatabase } = require('./shared'); +const { delay } = require('./shared'); const expect = chai.expect; describe('Change Stream Spec', function () { @@ -280,3 +281,19 @@ describe('Change Stream Spec', function () { return () => target[command].apply(target, args); } }); + +describe('Change Streams Spec Unified Tests', function () { + for (const changeStreamTest of loadSpecTests(path.join('change-stream', 'unified'))) { + expect(changeStreamTest).to.exist; + context(String(changeStreamTest.description), function () { + for (const test of changeStreamTest.tests) { + it(String(test.description), { + metadata: { sessions: { skipLeakTests: true } }, + test: async function () { + await runUnifiedTest(this, changeStreamTest, test); + } + }); + } + }); + } +}); diff --git a/test/functional/retryable_writes.test.js b/test/functional/retryable_writes.test.js index 1c79a06fcd6..79964fd2820 100644 --- a/test/functional/retryable_writes.test.js +++ b/test/functional/retryable_writes.test.js @@ -99,8 +99,8 @@ function executeScenarioTest(test, ctx) { /expected false to be true/ ); if (hasResult) expect(err.result).to.matchMongoSpec(test.outcome.result); - if (errorLabelsContain) expect(err.errorLabels).to.have.members(errorLabelsContain); - if (errorLabelsOmit) expect(err.errorLabels).to.not.have.members(errorLabelsOmit); + if (errorLabelsContain) expect(err.errorLabels).to.containSubset(errorLabelsContain); + if (errorLabelsOmit) expect(err.errorLabels).to.not.containSubset(errorLabelsOmit); }); } else if (test.outcome.result) { const expected = test.outcome.result; diff --git a/test/functional/spec-runner/context.js b/test/functional/spec-runner/context.js index a696d5f4aa6..04510637cae 100644 --- a/test/functional/spec-runner/context.js +++ b/test/functional/spec-runner/context.js @@ -124,11 +124,14 @@ class TestRunnerContext { targetedFailPoint(options) { const session = options.session; const failPoint = options.failPoint; - expect(session.transaction.isPinned).to.be.true; + expect(session.isPinned).to.be.true; return new Promise((resolve, reject) => { - const server = session.transaction.server; - server.command(ns('admin.$cmd'), failPoint, undefined, err => { + const serverOrConnection = session.loadBalanced + ? session.pinnedConnection + : session.transaction.server; + + serverOrConnection.command(ns('admin.$cmd'), failPoint, undefined, err => { if (err) return reject(err); this.appliedFailPoints.push(failPoint); diff --git a/test/functional/spec-runner/index.js b/test/functional/spec-runner/index.js index 7a30de025d8..b9e18ca38fa 100644 --- a/test/functional/spec-runner/index.js +++ b/test/functional/spec-runner/index.js @@ -193,10 +193,15 @@ function prepareDatabaseForSuite(suite, context) { .admin() .command({ killAllSessions: [] }) .catch(err => { - if (err.message.match(/no such (cmd|command)/) || err.code === 11601) { + if ( + err.message.match(/no such (cmd|command)/) || + err.message.match(/Failed to kill on some hosts/) || + err.code === 11601 + ) { return; } + console.log('err', err); throw err; }); diff --git a/test/functional/transactions.test.js b/test/functional/transactions.test.js index 170047258fe..36c9ed304ef 100644 --- a/test/functional/transactions.test.js +++ b/test/functional/transactions.test.js @@ -70,14 +70,14 @@ class TransactionsRunnerContext extends TestRunnerContext { expect(options).to.have.property('session'); const session = options.session; - expect(session.transaction.isPinned).to.be.true; + expect(session.isPinned).to.be.true; } assertSessionUnpinned(options) { expect(options).to.have.property('session'); const session = options.session; - expect(session.transaction.isPinned).to.be.false; + expect(session.isPinned).to.be.false; } } diff --git a/test/functional/unified-spec-runner/entities.ts b/test/functional/unified-spec-runner/entities.ts index a1adff3e9f3..76a6eaf1a74 100644 --- a/test/functional/unified-spec-runner/entities.ts +++ b/test/functional/unified-spec-runner/entities.ts @@ -32,7 +32,7 @@ import type { } from '../../../src/cmap/command_monitoring_events'; import { patchCollectionOptions, patchDbOptions } from './unified-utils'; import { expect } from 'chai'; -import { TestConfiguration } from './runner'; +import { TestConfiguration, trace } from './runner'; interface UnifiedChangeStream extends ChangeStream { eventCollector: InstanceType; @@ -166,7 +166,7 @@ export class UnifiedMongoClient extends MongoClient { export class FailPointMap extends Map { async enableFailPoint( - addressOrClient: HostAddress | UnifiedMongoClient, + addressOrClient: string | HostAddress | UnifiedMongoClient, failPoint: Document ): Promise { let client: MongoClient; @@ -294,18 +294,28 @@ export class EntitiesMap extends Map { async cleanup(): Promise { await this.failPoints.disableFailPoints(); + + trace('closeCursors'); for (const [, cursor] of this.mapOf('cursor')) { await cursor.close(); } + + trace('closeStreams'); for (const [, stream] of this.mapOf('stream')) { await stream.close(); } + + trace('endSessions'); for (const [, session] of this.mapOf('session')) { await session.endSession({ force: true }); } + + trace('closeClient'); for (const [, client] of this.mapOf('client')) { await client.close(); } + + trace('clear'); this.clear(); } diff --git a/test/functional/unified-spec-runner/operations.ts b/test/functional/unified-spec-runner/operations.ts index a4dda6f5ed8..bab430ba99a 100644 --- a/test/functional/unified-spec-runner/operations.ts +++ b/test/functional/unified-spec-runner/operations.ts @@ -150,12 +150,12 @@ operations.set('assertSessionNotDirty', async ({ entities, operation }) => { operations.set('assertSessionPinned', async ({ entities, operation }) => { const session = entities.getEntity('session', operation.arguments.session); - expect(session.transaction.isPinned).to.be.true; + expect(session.isPinned, 'session should be pinned').to.be.true; }); operations.set('assertSessionUnpinned', async ({ entities, operation }) => { const session = entities.getEntity('session', operation.arguments.session); - expect(session.transaction.isPinned).to.be.false; + expect(session.isPinned, 'session should be unpinned').to.be.false; }); operations.set('assertSessionTransactionState', async ({ entities, operation }) => { @@ -180,8 +180,9 @@ operations.set('assertNumberConnectionsCheckedOut', async ({ entities, operation const pool = server.s.pool; return count + pool.currentCheckedOutCount; }, 0); - // TODO: Durran: Fix in NODE-3011 - expect(checkedOutConnections || 0).to.equal(operation.arguments.connections); + + await Promise.resolve(); // wait one tick + expect(checkedOutConnections).to.equal(operation.arguments.connections); }); operations.set('bulkWrite', async ({ entities, operation }) => { @@ -245,8 +246,9 @@ operations.set('createCollection', async ({ entities, operation }) => { operations.set('createFindCursor', async ({ entities, operation }) => { const collection = entities.getEntity('collection', operation.object); + const session = entities.getEntity('session', operation.arguments.session, false); const { filter, sort, batchSize, limit, let: vars } = operation.arguments; - const cursor = collection.find(filter, { sort, batchSize, limit, let: vars }); + const cursor = collection.find(filter, { session, sort, batchSize, limit, let: vars }); // The spec dictates that we create the cursor and force the find command // to execute, but don't move the cursor forward. hasNext() accomplishes // this. @@ -375,11 +377,12 @@ operations.set('startTransaction', async ({ entities, operation }) => { operations.set('targetedFailPoint', async ({ entities, operation }) => { const session = entities.getEntity('session', operation.arguments.session); - expect(session.transaction.isPinned, 'Session must be pinned for a targetedFailPoint').to.be.true; - await entities.failPoints.enableFailPoint( - session.transaction._pinnedServer.s.description.hostAddress, - operation.arguments.failPoint - ); + expect(session.isPinned, 'Session must be pinned for a targetedFailPoint').to.be.true; + const address = session.transaction.isPinned + ? session.transaction._pinnedServer.s.description.hostAddress + : session.pinnedConnection.address; + + await entities.failPoints.enableFailPoint(address, operation.arguments.failPoint); }); operations.set('delete', async ({ entities, operation }) => { diff --git a/test/functional/unified-spec-runner/runner.ts b/test/functional/unified-spec-runner/runner.ts index 6f55f49cd59..6bcb8b3f67e 100644 --- a/test/functional/unified-spec-runner/runner.ts +++ b/test/functional/unified-spec-runner/runner.ts @@ -7,6 +7,7 @@ import { ns } from '../../../src/utils'; import { executeOperationAndCheck } from './operations'; import { matchesEvents } from './match'; import { satisfies as semverSatisfies } from 'semver'; +import { MongoClient } from '../../../src/mongo_client'; export type TestConfiguration = InstanceType< typeof import('../../tools/runner/config')['TestConfiguration'] @@ -15,6 +16,25 @@ interface MongoDBMochaTestContext extends Mocha.Context { configuration: TestConfiguration; } +export function trace(message: string): void { + if (process.env.UTR_TRACE) { + console.log(` > ${message}`); + } +} + +async function terminateOpenTransactions(client: MongoClient) { + // TODO: on sharded clusters this has to be run on each mongos + try { + await client.db().admin().command({ killAllSessions: [] }); + } catch (err) { + if (err.code === 11601 || err.code === 13 || err.code === 59) { + return; + } + + throw err; + } +} + export async function runUnifiedTest( ctx: MongoDBMochaTestContext, unifiedSuite: uni.UnifiedSuite, @@ -51,8 +71,12 @@ export async function runUnifiedTest( let entities; try { + trace('\n starting test:'); await utilClient.connect(); + // terminate all sessions before each test suite + await terminateOpenTransactions(utilClient); + // Must fetch parameters before checking runOnRequirements ctx.configuration.parameters = await utilClient.db().admin().command({ getParameter: '*' }); @@ -63,6 +87,7 @@ export async function runUnifiedTest( ...(test.runOnRequirements ?? []) ]; + trace('satisfiesRequirements'); for (const requirement of allRequirements) { const met = await topologySatisfies(ctx.configuration, requirement, utilClient); if (!met) { @@ -75,15 +100,19 @@ export async function runUnifiedTest( // documents are specified, the test runner MUST create the collection with a "majority" write concern. // The test runner MUST use the internal MongoClient for these operations. if (unifiedSuite.initialData) { + trace('initialData'); for (const collData of unifiedSuite.initialData) { const db = utilClient.db(collData.databaseName); const collection = db.collection(collData.collectionName, { writeConcern: { w: 'majority' } }); + + trace('listCollections'); const collectionList = await db .listCollections({ name: collData.collectionName }) .toArray(); if (collectionList.length !== 0) { + trace('drop'); expect(await collection.drop()).to.be.true; } } @@ -95,16 +124,19 @@ export async function runUnifiedTest( }); if (!collData.documents?.length) { + trace('createCollection'); await db.createCollection(collData.collectionName, { writeConcern: { w: 'majority' } }); continue; } + trace('insertMany'); await collection.insertMany(collData.documents); } } + trace('createEntities'); entities = await EntitiesMap.createEntities(ctx.configuration, unifiedSuite.createEntities); // Workaround for SERVER-39704: @@ -125,7 +157,15 @@ export async function runUnifiedTest( } for (const operation of test.operations) { - await executeOperationAndCheck(operation, entities, utilClient); + trace(operation.name); + try { + await executeOperationAndCheck(operation, entities, utilClient); + } catch (err) { + // clean up all sessions on failed test, and rethrow + await terminateOpenTransactions(utilClient); + + throw err; + } } const clientCommandEvents = new Map(); diff --git a/test/functional/unified-spec-runner/schema.ts b/test/functional/unified-spec-runner/schema.ts index 659f0b6c328..e399c37764e 100644 --- a/test/functional/unified-spec-runner/schema.ts +++ b/test/functional/unified-spec-runner/schema.ts @@ -13,6 +13,7 @@ export interface OperationDescription { expectError?: ExpectedError; expectResult?: unknown; saveResultAsEntity?: string; + ignoreResultAndError?: boolean; } export interface UnifiedSuite { description: string; diff --git a/test/manual/load-balancer.test.js b/test/manual/load-balancer.test.js new file mode 100644 index 00000000000..966d876546d --- /dev/null +++ b/test/manual/load-balancer.test.js @@ -0,0 +1,49 @@ +'use strict'; +const { loadSpecTests } = require('../spec/index'); +const { runUnifiedTest } = require('../functional/unified-spec-runner/runner'); +const { expect } = require('chai'); + +const SKIP = [ + // Verified they use the same connection but the Node implementation executes + // a getMore before the killCursors even though the stream is immediately + // closed. + 'change streams pin to a connection', + 'errors during the initial connection hello are ignore', + + // NOTE: The following three tests are skipped pending a decision made on DRIVERS-1847, since + // pinning the connection on any getMore error is very awkward in node and likely results + // in sub-optimal pinning. + 'pinned connections are not returned after an network error during getMore', + 'pinned connections are not returned to the pool after a non-network error on getMore', + 'stale errors are ignored' +]; + +require('../functional/retryable_reads.test'); +require('../functional/retryable_writes.test'); +require('../functional/transactions.test'); +require('../functional/uri_options_spec.test'); +require('../functional/change_stream_spec.test'); +require('../functional/transactions.test'); +require('../functional/versioned-api.test'); +require('../unit/core/mongodb_srv.test'); +require('../unit/sdam/server_selection/spec.test'); + +describe('Load Balancer Spec Unified Tests', function () { + this.timeout(10000); + for (const loadBalancerTest of loadSpecTests('load-balancers')) { + expect(loadBalancerTest).to.exist; + context(String(loadBalancerTest.description), function () { + for (const test of loadBalancerTest.tests) { + const description = String(test.description); + if (!SKIP.includes(description)) { + it(description, { + metadata: { sessions: { skipLeakTests: true } }, + test: async function () { + await runUnifiedTest(this, loadBalancerTest, test); + } + }); + } + } + }); + } +}); diff --git a/test/tools/mock.js b/test/tools/mock.js index 0db171308a0..c0024476a89 100644 --- a/test/tools/mock.js +++ b/test/tools/mock.js @@ -7,8 +7,14 @@ const { DEFAULT_ISMASTER_36 // eslint-disable-next-line no-restricted-modules } = require('mongodb-mock-server'); +const { ObjectId } = require('bson'); const { HostAddress } = require('../../src/utils'); +const DEFAULT_HELLO_50 = { + ...DEFAULT_ISMASTER_36, + serverId: new ObjectId() +}; + /** * @callback GetHostAddress * @returns {import('../../src/mongo_client').HostAddress} @@ -71,5 +77,6 @@ module.exports = { createServer, cleanup, DEFAULT_ISMASTER, - DEFAULT_ISMASTER_36 + DEFAULT_ISMASTER_36, + DEFAULT_HELLO_50 }; diff --git a/test/tools/runner/config.js b/test/tools/runner/config.js index 1b02cfa929a..edcdc347222 100644 --- a/test/tools/runner/config.js +++ b/test/tools/runner/config.js @@ -38,13 +38,13 @@ class TestConfiguration { const url = new ConnectionString(uri); const { hosts } = url; const hostAddresses = hosts.map(HostAddress.fromString); - this.topologyType = context.topologyType; this.version = context.version; this.clientSideEncryption = context.clientSideEncryption; this.serverApi = context.serverApi; this.parameters = undefined; this.singleMongosLoadBalancerUri = context.singleMongosLoadBalancerUri; this.multiMongosLoadBalancerUri = context.multiMongosLoadBalancerUri; + this.topologyType = this.isLoadBalanced ? TopologyType.LoadBalanced : context.topologyType; this.options = { hosts, hostAddresses, @@ -62,6 +62,10 @@ class TestConfiguration { } } + get isLoadBalanced() { + return !!this.singleMongosLoadBalancerUri && !!this.multiMongosLoadBalancerUri; + } + writeConcern() { return { writeConcern: { w: 1 } }; } @@ -235,10 +239,20 @@ class TestConfiguration { let actualHostsString; if (options.useMultipleMongoses) { - expect(this.options.hostAddresses).to.have.length.greaterThan(1); - actualHostsString = this.options.hostAddresses.map(ha => ha.toString()).join(','); + if (this.isLoadBalanced) { + const multiUri = new ConnectionString(this.multiMongosLoadBalancerUri); + actualHostsString = multiUri.hosts[0].toString(); + } else { + expect(this.options.hostAddresses).to.have.length.greaterThan(1); + actualHostsString = this.options.hostAddresses.map(ha => ha.toString()).join(','); + } } else { - actualHostsString = this.options.hostAddresses[0].toString(); + if (this.isLoadBalanced) { + const singleUri = new ConnectionString(this.singleMongosLoadBalancerUri); + actualHostsString = singleUri.hosts[0].toString(); + } else { + actualHostsString = this.options.hostAddresses[0].toString(); + } } const connectionString = url.toString().replace(FILLER_HOST, actualHostsString); diff --git a/test/tools/runner/filters/mongodb_topology_filter.js b/test/tools/runner/filters/mongodb_topology_filter.js index cc72404d6e5..dafbed52d88 100755 --- a/test/tools/runner/filters/mongodb_topology_filter.js +++ b/test/tools/runner/filters/mongodb_topology_filter.js @@ -49,6 +49,8 @@ function topologyTypeToString(topologyType) { return 'replicaset'; } else if (topologyType === TopologyType.Sharded) { return 'sharded'; + } else if (topologyType === TopologyType.LoadBalanced) { + return 'load-balanced'; } return 'single'; diff --git a/test/tools/runner/index.js b/test/tools/runner/index.js index 08a51f90f88..82c7f6ef151 100644 --- a/test/tools/runner/index.js +++ b/test/tools/runner/index.js @@ -66,6 +66,7 @@ before(function (_done) { // ); const options = MONGODB_API_VERSION ? { serverApi: MONGODB_API_VERSION } : {}; + const client = new MongoClient(MONGODB_URI, options); const done = err => client.close(err2 => _done(err || err2)); diff --git a/test/unit/cmap/connection.test.js b/test/unit/cmap/connection.test.js index ad88b347eae..42a0cdb975c 100644 --- a/test/unit/cmap/connection.test.js +++ b/test/unit/cmap/connection.test.js @@ -2,8 +2,9 @@ const mock = require('../../tools/mock'); const { connect } = require('../../../src/cmap/connect'); -const { Connection } = require('../../../src/cmap/connection'); +const { Connection, hasSessionSupport } = require('../../../src/cmap/connection'); const { expect } = require('chai'); +const { Socket } = require('net'); const { ns } = require('../../../src/utils'); const { getSymbolFrom } = require('../../tools/utils'); @@ -106,4 +107,49 @@ describe('Connection - unit/cmap', function () { done(); }); }); + + describe('.hasSessionSupport', function () { + let connection; + const stream = new Socket(); + + context('when logicalSessionTimeoutMinutes is preset', function () { + beforeEach(function () { + connection = new Connection(stream, { + hostAddress: server.hostAddress(), + logicalSessionTimeoutMinutes: 5 + }); + }); + + it('returns true', function () { + expect(hasSessionSupport(connection)).to.be.true; + }); + }); + + context('when logicalSessionTimeoutMinutes is not present', function () { + context('when in load balancing mode', function () { + beforeEach(function () { + connection = new Connection(stream, { + hostAddress: server.hostAddress(), + loadBalanced: true + }); + }); + + it('returns true', function () { + expect(hasSessionSupport(connection)).to.be.true; + }); + }); + + context('when not in load balancing mode', function () { + beforeEach(function () { + connection = new Connection(stream, { + hostAddress: server.hostAddress() + }); + }); + + it('returns false', function () { + expect(hasSessionSupport(connection)).to.be.false; + }); + }); + }); + }); }); diff --git a/test/unit/cmap/connection_pool.test.js b/test/unit/cmap/connection_pool.test.js index ebd11017f77..d21233c4454 100644 --- a/test/unit/cmap/connection_pool.test.js +++ b/test/unit/cmap/connection_pool.test.js @@ -73,6 +73,7 @@ describe('Connection Pool', function () { }); pool.withConnection( + undefined, (err, conn, cb) => { expect(err).to.not.exist; cb(); @@ -193,11 +194,15 @@ describe('Connection Pool', function () { pool.close(done); }; - pool.withConnection((err, conn, cb) => { - expect(err).to.exist; - expect(err).to.match(/closed/); - cb(err); - }, callback); + pool.withConnection( + undefined, + (err, conn, cb) => { + expect(err).to.exist; + expect(err).to.match(/closed/); + cb(err); + }, + callback + ); }); it('should return an error to the original callback', function (done) { @@ -216,10 +221,14 @@ describe('Connection Pool', function () { pool.close(done); }; - pool.withConnection((err, conn, cb) => { - expect(err).to.not.exist; - cb(new Error('my great error')); - }, callback); + pool.withConnection( + undefined, + (err, conn, cb) => { + expect(err).to.not.exist; + cb(new Error('my great error')); + }, + callback + ); }); it('should still manage a connection if no callback is provided', function (done) { @@ -243,13 +252,77 @@ describe('Connection Pool', function () { pool.close(done); }); - pool.withConnection((err, conn, cb) => { + pool.withConnection(undefined, (err, conn, cb) => { expect(err).to.not.exist; cb(); }); }); }); + describe.skip('#closeConnections', function () { + context('when the server id matches', function () { + let pool; + + beforeEach(() => { + pool = new ConnectionPool({ + minPoolSize: 1, + hostAddress: server.hostAddress() + }); + }); + + afterEach(done => { + pool.close(done); + }); + + it('closes the matching connections', function (done) { + const hello = mock.DEFAULT_HELLO_50; + server.setMessageHandler(request => { + const doc = request.document; + if (doc.ismaster) { + request.reply(hello); + } + }); + pool.on(ConnectionPool.CONNECTION_CLOSED, event => { + console.log('event', event); + done(); + }); + + const connection = pool.checkOut(); + pool.checkIn(connection); + pool.closeConnections(hello.serverId); + }); + }); + + context('when the server id does not match', function () { + let pool; + + beforeEach(() => { + pool = new ConnectionPool({ + minPoolSize: 3, + hostAddress: server.hostAddress() + }); + }); + + afterEach(done => { + pool.close(done); + }); + + it('does not close any connections', function (done) { + const hello = mock.DEFAULT_HELLO_50; + server.setMessageHandler(request => { + const doc = request.document; + if (doc.ismaster) { + request.reply(hello); + } + }); + pool.closeConnections(hello.serverId); + process.nextTick(() => { + done(); + }); + }); + }); + }); + describe('spec tests', function () { const threads = new Map(); const connections = new Map(); diff --git a/test/unit/cmap/metrics.test.js b/test/unit/cmap/metrics.test.js new file mode 100644 index 00000000000..a4270fb533f --- /dev/null +++ b/test/unit/cmap/metrics.test.js @@ -0,0 +1,102 @@ +'use strict'; + +const { expect } = require('chai'); +const { ConnectionPoolMetrics } = require('../../../src/cmap/metrics'); + +describe('ConnectionPoolMetrics', function () { + describe('#constructor', function () { + const metrics = new ConnectionPoolMetrics(); + + it('defaults txnConnections to zero', function () { + expect(metrics.txnConnections).to.equal(0); + }); + + it('defaults cursorConnections to zero', function () { + expect(metrics.cursorConnections).to.equal(0); + }); + + it('defaults otherConnections to zero', function () { + expect(metrics.otherConnections).to.equal(0); + }); + }); + + describe('#info', function () { + const metrics = new ConnectionPoolMetrics(); + + it('returns the metrics information', function () { + expect(metrics.info()).to.equal( + 'connections in use by cursors: 0,' + + 'connections in use by transactions: 0,' + + 'connections in use by other operations: 0' + ); + }); + }); + + describe('#markPinned', function () { + const metrics = new ConnectionPoolMetrics(); + + context('when the type is TXN', function () { + before(function () { + metrics.markPinned(ConnectionPoolMetrics.TXN); + }); + + it('increments the txnConnections count', function () { + expect(metrics.txnConnections).to.equal(1); + }); + }); + + context('when the type is CURSOR', function () { + before(function () { + metrics.markPinned(ConnectionPoolMetrics.CURSOR); + }); + + it('increments the cursorConnections count', function () { + expect(metrics.cursorConnections).to.equal(1); + }); + }); + + context('when the type is OTHER', function () { + before(function () { + metrics.markPinned(ConnectionPoolMetrics.OTHER); + }); + + it('increments the otherConnections count', function () { + expect(metrics.otherConnections).to.equal(1); + }); + }); + }); + + describe('#markUnpinned', function () { + const metrics = new ConnectionPoolMetrics(); + + context('when the type is TXN', function () { + before(function () { + metrics.markUnpinned(ConnectionPoolMetrics.TXN); + }); + + it('increments the txnConnections count', function () { + expect(metrics.txnConnections).to.equal(-1); + }); + }); + + context('when the type is CURSOR', function () { + before(function () { + metrics.markUnpinned(ConnectionPoolMetrics.CURSOR); + }); + + it('increments the cursorConnections count', function () { + expect(metrics.cursorConnections).to.equal(-1); + }); + }); + + context('when the type is OTHER', function () { + before(function () { + metrics.markUnpinned(ConnectionPoolMetrics.OTHER); + }); + + it('increments the otherConnections count', function () { + expect(metrics.otherConnections).to.equal(-1); + }); + }); + }); +}); diff --git a/test/unit/cmap/stream_description.test.js b/test/unit/cmap/stream_description.test.js new file mode 100644 index 00000000000..8fb9d6706e8 --- /dev/null +++ b/test/unit/cmap/stream_description.test.js @@ -0,0 +1,56 @@ +'use strict'; + +const { StreamDescription } = require('../../../src/cmap/stream_description'); +const { expect } = require('chai'); + +describe('StreamDescription - unit/cmap', function () { + describe('.new', function () { + context('when options are provided', function () { + context('when logicalSessionTimeoutMinutes is in the options', function () { + const options = { logicalSessionTimeoutMinutes: 5 }; + const description = new StreamDescription('a:27017', options); + + it('sets the property', function () { + expect(description.logicalSessionTimeoutMinutes).to.eq(5); + }); + }); + + context('when logicalSessionTimeoutMinutes is not in the options', function () { + const description = new StreamDescription('a:27017', {}); + + it('sets logicalSessionTimeoutMinutes to undefined', function () { + expect(description.logicalSessionTimeoutMinutes).to.not.exist; + }); + }); + + context('when loadBalanced is in the options', function () { + const options = { loadBalanced: true }; + const description = new StreamDescription('a:27017', options); + + it('sets the property', function () { + expect(description.loadBalanced).to.be.true; + }); + }); + + context('when loadBalanced is not in the options', function () { + const description = new StreamDescription('a:27017', {}); + + it('sets loadBalanced to false', function () { + expect(description.loadBalanced).to.be.false; + }); + }); + }); + + context('when options are not provided', function () { + const description = new StreamDescription('a:27017'); + + it('defaults logicalSessionTimeoutMinutes to undefined', function () { + expect(description.logicalSessionTimeoutMinutes).to.not.exist; + }); + + it('defaults loadBalanced to false', function () { + expect(description.loadBalanced).to.be.false; + }); + }); + }); +}); diff --git a/test/unit/core/mongodb_srv.test.js b/test/unit/core/mongodb_srv.test.js index dd747534e3f..9edd963b2e0 100644 --- a/test/unit/core/mongodb_srv.test.js +++ b/test/unit/core/mongodb_srv.test.js @@ -55,6 +55,9 @@ describe('mongodb+srv', function () { expect(options).to.have.property('credentials'); expect(options.credentials.source).to.equal(testOptions.authSource); } + if (testOptions && testOptions.loadBalanced) { + expect(options).to.have.property('loadBalanced', testOptions.loadBalanced); + } if ( test[1].parsed_options && test[1].parsed_options.user && diff --git a/test/unit/core/response_test.js.test.js b/test/unit/core/response_test.js.test.js index 36177099b9f..b27fdf3ccb8 100644 --- a/test/unit/core/response_test.js.test.js +++ b/test/unit/core/response_test.js.test.js @@ -45,6 +45,8 @@ describe('Response', function () { }); } else if (doc.getMore) { request.reply(errdoc); + } else if (doc.killCursors) { + request.reply({ ok: 1 }); } }); diff --git a/test/unit/mongo_client_options.test.js b/test/unit/mongo_client_options.test.js index f6ea142272f..c38b61b63d4 100644 --- a/test/unit/mongo_client_options.test.js +++ b/test/unit/mongo_client_options.test.js @@ -80,6 +80,7 @@ describe('MongoOptions', function () { compressors: 'snappy', // TODO connectTimeoutMS: 123, directConnection: true, + loadBalanced: false, dbName: 'test', driverInfo: { name: 'MyDriver', platform: 'moonOS' }, family: 6, @@ -450,4 +451,32 @@ describe('MongoOptions', function () { ); }); }); + + context('when loadBalanced=true is in the URI', function () { + it('sets the option', function () { + const options = parseOptions('mongodb://a/?loadBalanced=true'); + expect(options.loadBalanced).to.be.true; + }); + + it('errors with multiple hosts', function () { + const parse = () => { + parseOptions('mongodb://a,b/?loadBalanced=true'); + }; + expect(parse).to.throw(/single host/); + }); + + it('errors with a replicaSet option', function () { + const parse = () => { + parseOptions('mongodb://a/?loadBalanced=true&replicaSet=test'); + }; + expect(parse).to.throw(/replicaSet/); + }); + + it('errors with a directConnection option', function () { + const parse = () => { + parseOptions('mongodb://a/?loadBalanced=true&directConnection=true'); + }; + expect(parse).to.throw(/directConnection/); + }); + }); }); diff --git a/test/unit/sdam/server_selection/spec.test.js b/test/unit/sdam/server_selection/spec.test.js index 5fd692ddd8b..09551f58a4f 100644 --- a/test/unit/sdam/server_selection/spec.test.js +++ b/test/unit/sdam/server_selection/spec.test.js @@ -3,7 +3,7 @@ const path = require('path'); const fs = require('fs'); const { Topology } = require('../../../../src/sdam/topology'); const { Server } = require('../../../../src/sdam/server'); -const { ServerType } = require('../../../../src/sdam/common'); +const { ServerType, TopologyType } = require('../../../../src/sdam/common'); const { ServerDescription } = require('../../../../src/sdam/server_description'); const { ReadPreference } = require('../../../../src/read_preference'); const { MongoServerSelectionError } = require('../../../../src/error'); @@ -150,10 +150,21 @@ function serverDescriptionFromDefinition(definition, hosts) { hosts = hosts || []; const serverType = definition.type; + if (serverType === ServerType.Unknown) { return new ServerDescription(definition.address); } + // There's no monitor in load balanced mode so no fake hello + // is needed. + if (serverType === ServerType.LoadBalancer) { + const description = new ServerDescription(definition.address, undefined, { + loadBalanced: true + }); + delete description.lastUpdateTime; + return description; + } + const fakeIsMaster = { ok: 1, hosts }; if (serverType !== ServerType.Standalone && serverType !== ServerType.Mongos) { fakeIsMaster.setName = 'rs'; @@ -218,7 +229,8 @@ function executeServerSelectionTest(testDefinition, options, testDone) { const topologyOptions = { heartbeatFrequencyMS: testDefinition.heartbeatFrequencyMS, - monitorFunction: () => {} + monitorFunction: () => {}, + loadBalanced: topologyDescription.type === TopologyType.LoadBalanced }; const topology = new Topology(seedData.seedlist, topologyOptions); diff --git a/test/unit/sdam/spec.test.js b/test/unit/sdam/spec.test.js index 7bf2d1b4621..32974244edf 100644 --- a/test/unit/sdam/spec.test.js +++ b/test/unit/sdam/spec.test.js @@ -2,6 +2,7 @@ const fs = require('fs'); const path = require('path'); const { Topology } = require('../../../src/sdam/topology'); +const { TopologyType } = require('../../../src/sdam/common'); const { Server } = require('../../../src/sdam/server'); const { ServerDescription } = require('../../../src/sdam/server_description'); const sdamEvents = require('../../../src/sdam/events'); @@ -249,7 +250,6 @@ function executeSDAMTest(testData, testDone) { phase.responses.forEach(response => topology.serverUpdateHandler(new ServerDescription(response[0], response[1])) ); - phaseDone(); } else if (phase.applicationErrors) { eachAsyncSeries( @@ -272,6 +272,8 @@ function executeSDAMTest(testData, testDone) { phaseDone(); } ); + } else { + phaseDone(); } }, err => { @@ -283,7 +285,7 @@ function executeSDAMTest(testData, testDone) { } function withConnectionStubImpl(appError) { - return function (fn, callback) { + return function (conn, fn, callback) { const connectionPool = this; // we are stubbing `withConnection` on the `ConnectionPool` class const fakeConnection = { generation: @@ -353,6 +355,15 @@ function assertOutcomeExpectations(topology, events, outcome) { return; } + // Load balancer mode has no monitor ismaster response and + // only expects address and compatible to be set in the + // server description. + if (description.type === TopologyType.LoadBalanced) { + if (key !== 'address' || key !== 'compatible') { + return; + } + } + if (key === 'events') { const expectedEvents = convertOutcomeEvents(outcomeValue); expect(events).to.have.length(expectedEvents.length);