diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index 72cc1a9420..ac857bf214 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -762,7 +762,7 @@ function cleanupCursor( } if (!session.inTransaction()) { - maybeClearPinnedConnection(session, { error }); + maybeClearPinnedConnection(session, { error, force: true }); } } @@ -779,7 +779,7 @@ function cleanupCursor( } if (!session.inTransaction()) { - maybeClearPinnedConnection(session, { error }); + maybeClearPinnedConnection(session, { error, force: true }); } } diff --git a/src/operations/aggregate.ts b/src/operations/aggregate.ts index 7d09495eeb..ceb7907b3e 100644 --- a/src/operations/aggregate.ts +++ b/src/operations/aggregate.ts @@ -79,6 +79,10 @@ export class AggregateOperation extends CommandOperation { } } + get isCursorCreating(): boolean { + return true; + } + get canRetryRead(): boolean { return !this.hasWriteStage; } diff --git a/src/operations/estimated_document_count.ts b/src/operations/estimated_document_count.ts index b1fffcf2cd..f6faa5dc86 100644 --- a/src/operations/estimated_document_count.ts +++ b/src/operations/estimated_document_count.ts @@ -28,6 +28,10 @@ export class EstimatedDocumentCountOperation extends CommandOperation { this.collectionName = collection.collectionName; } + get isCursorCreating(): boolean { + return true; + } + execute(server: Server, session: ClientSession, callback: Callback): void { if (maxWireVersion(server) < 12) { return this.executeLegacy(server, session, callback); diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index 45fb0fed19..c348023910 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -4,6 +4,7 @@ import { isRetryableError, MONGODB_ERROR_CODES, MongoDriverError, + MongoNetworkError, MongoCompatibilityError, MongoServerError } from '../error'; @@ -181,16 +182,31 @@ function executeWithServerSelection( } // select a new server, and attempt to retry the operation - topology.selectServer(readPreference, serverSelectionOptions, (err?: any, server?: any) => { + topology.selectServer(readPreference, serverSelectionOptions, (e?: any, server?: any) => { if ( - err || + e || (operation.hasAspect(Aspect.READ_OPERATION) && !supportsRetryableReads(server)) || (operation.hasAspect(Aspect.WRITE_OPERATION) && !supportsRetryableWrites(server)) ) { - callback(err); + callback(e); return; } + // If we have a cursor and the initial command fails with a network error, + // we can retry it on another connection. So we need to check it back in, clear the + // pool for the service id, and retry again. + if ( + err && + err instanceof MongoNetworkError && + server.loadBalanced && + session && + session.isPinned && + !session.inTransaction() && + operation.isCursorCreating + ) { + session.unpin({ force: true, forceClear: true }); + } + operation.execute(server, session, callback); }); } diff --git a/src/operations/find.ts b/src/operations/find.ts index 4f2b15b904..06c879aec8 100644 --- a/src/operations/find.ts +++ b/src/operations/find.ts @@ -101,6 +101,10 @@ export class FindOperation extends CommandOperation { this.filter = filter != null && filter._bsontype === 'ObjectID' ? { _id: filter } : filter; } + get isCursorCreating(): boolean { + return true; + } + execute(server: Server, session: ClientSession, callback: Callback): void { this.server = server; diff --git a/src/operations/find_one.ts b/src/operations/find_one.ts index c498fd82d2..198e37f407 100644 --- a/src/operations/find_one.ts +++ b/src/operations/find_one.ts @@ -21,6 +21,10 @@ export class FindOneOperation extends CommandOperation { this.query = query; } + get isCursorCreating(): boolean { + return true; + } + execute(server: Server, session: ClientSession, callback: Callback): void { const coll = this.collection; const query = this.query; diff --git a/src/operations/indexes.ts b/src/operations/indexes.ts index ed93aef564..55561eb6cf 100644 --- a/src/operations/indexes.ts +++ b/src/operations/indexes.ts @@ -383,6 +383,10 @@ export class ListIndexesOperation extends CommandOperation { this.collectionNamespace = collection.s.namespace; } + get isCursorCreating(): boolean { + return true; + } + execute(server: Server, session: ClientSession, callback: Callback): void { const serverWireVersion = maxWireVersion(server); if (serverWireVersion < LIST_INDEXES_WIRE_VERSION) { diff --git a/src/operations/list_collections.ts b/src/operations/list_collections.ts index 8512adf683..573725d94a 100644 --- a/src/operations/list_collections.ts +++ b/src/operations/list_collections.ts @@ -40,6 +40,10 @@ export class ListCollectionsOperation extends CommandOperation { } } + get isCursorCreating(): boolean { + return true; + } + execute(server: Server, session: ClientSession, callback: Callback): void { if (maxWireVersion(server) < LIST_COLLECTIONS_WIRE_VERSION) { let filter = this.filter; diff --git a/src/operations/operation.ts b/src/operations/operation.ts index 0783ca8762..948a7ad036 100644 --- a/src/operations/operation.ts +++ b/src/operations/operation.ts @@ -89,6 +89,10 @@ export abstract class AbstractOperation { return this[kSession]; } + get isCursorCreating(): boolean { + return false; + } + get canRetryRead(): boolean { return true; } diff --git a/src/sessions.ts b/src/sessions.ts index 620c1d328c..e469ab92c2 100644 --- a/src/sessions.ts +++ b/src/sessions.ts @@ -95,6 +95,7 @@ export interface EndSessionOptions { */ error?: AnyError; force?: boolean; + forceClear?: boolean; } /** @@ -225,7 +226,7 @@ export class ClientSession extends TypedEventEmitter { } /** @internal */ - unpin(options?: { force?: boolean; error?: AnyError }): void { + unpin(options?: { force?: boolean; forceClear?: boolean; error?: AnyError }): void { if (this.loadBalanced) { return maybeClearPinnedConnection(this, options); } @@ -479,16 +480,23 @@ export function maybeClearPinnedConnection( // NOTE: the spec talks about what to do on a network error only, but the tests seem to // to validate that we don't unpin on _all_ errors? - if (conn && (options?.error == null || options?.force)) { + if (conn) { const servers = Array.from(session.topology.s.servers.values()); const loadBalancer = servers[0]; - loadBalancer.s.pool.checkIn(conn); - conn.emit( - Connection.UNPINNED, - session.transaction.state !== TxnState.NO_TRANSACTION - ? ConnectionPoolMetrics.TXN - : ConnectionPoolMetrics.CURSOR - ); + + if (options?.error == null || options?.force) { + loadBalancer.s.pool.checkIn(conn); + conn.emit( + Connection.UNPINNED, + session.transaction.state !== TxnState.NO_TRANSACTION + ? ConnectionPoolMetrics.TXN + : ConnectionPoolMetrics.CURSOR + ); + + if (options?.forceClear) { + loadBalancer.s.pool.clear(conn.serviceId); + } + } session[kPinnedConnection] = undefined; }