From 9deb6c1f1d39dec9043555395a3b3c5375142438 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Tue, 15 Mar 2022 14:42:09 -0400 Subject: [PATCH 1/8] move getTopology call into executeOperation --- src/admin.ts | 16 ++++----- src/collection.ts | 56 ++++++++++++++--------------- src/db.ts | 32 ++++++++--------- src/operations/execute_operation.ts | 45 +++++++++++++++++------ src/operations/indexes.ts | 2 +- src/operations/list_collections.ts | 2 +- src/utils.ts | 5 +++ 7 files changed, 89 insertions(+), 69 deletions(-) diff --git a/src/admin.ts b/src/admin.ts index b6576dbccc..78d9df17cf 100644 --- a/src/admin.ts +++ b/src/admin.ts @@ -14,7 +14,7 @@ import { ValidateCollectionOperation, ValidateCollectionOptions } from './operations/validate_collection'; -import { Callback, getTopology } from './utils'; +import type { Callback } from './utils'; /** @internal */ export interface AdminPrivate { @@ -83,7 +83,7 @@ export class Admin { options = Object.assign({ dbName: 'admin' }, options); return executeOperation( - getTopology(this.s.db), + this.s.db, new RunCommandOperation(this.s.db, command, options), callback ); @@ -207,7 +207,7 @@ export class Admin { options = Object.assign({ dbName: 'admin' }, options); return executeOperation( - getTopology(this.s.db), + this.s.db, new AddUserOperation(this.s.db, username, password, options), callback ); @@ -233,7 +233,7 @@ export class Admin { options = Object.assign({ dbName: 'admin' }, options); return executeOperation( - getTopology(this.s.db), + this.s.db, new RemoveUserOperation(this.s.db, username, options), callback ); @@ -263,7 +263,7 @@ export class Admin { options = options ?? {}; return executeOperation( - getTopology(this.s.db), + this.s.db, new ValidateCollectionOperation(this, collectionName, options), callback ); @@ -286,11 +286,7 @@ export class Admin { if (typeof options === 'function') (callback = options), (options = {}); options = options ?? {}; - return executeOperation( - getTopology(this.s.db), - new ListDatabasesOperation(this.s.db, options), - callback - ); + return executeOperation(this.s.db, new ListDatabasesOperation(this.s.db, options), callback); } /** diff --git a/src/collection.ts b/src/collection.ts index 81be9fe3ee..002d3335d9 100644 --- a/src/collection.ts +++ b/src/collection.ts @@ -296,7 +296,7 @@ export class Collection { } return executeOperation( - getTopology(this), + this, new InsertOneOperation( this as TODO_NODE_3286, doc, @@ -338,7 +338,7 @@ export class Collection { options = options ? Object.assign({}, options) : { ordered: true }; return executeOperation( - getTopology(this), + this, new InsertManyOperation( this as TODO_NODE_3286, docs, @@ -406,7 +406,7 @@ export class Collection { } return executeOperation( - getTopology(this), + this, new BulkWriteOperation( this as TODO_NODE_3286, operations as TODO_NODE_3286, @@ -453,7 +453,7 @@ export class Collection { if (typeof options === 'function') (callback = options), (options = {}); return executeOperation( - getTopology(this), + this, new UpdateOneOperation( this as TODO_NODE_3286, filter, @@ -501,7 +501,7 @@ export class Collection { if (typeof options === 'function') (callback = options), (options = {}); return executeOperation( - getTopology(this), + this, new ReplaceOneOperation( this as TODO_NODE_3286, filter, @@ -549,7 +549,7 @@ export class Collection { if (typeof options === 'function') (callback = options), (options = {}); return executeOperation( - getTopology(this), + this, new UpdateManyOperation( this as TODO_NODE_3286, filter, @@ -583,7 +583,7 @@ export class Collection { if (typeof options === 'function') (callback = options), (options = {}); return executeOperation( - getTopology(this), + this, new DeleteOneOperation(this as TODO_NODE_3286, filter, resolveOptions(this, options)), callback ); @@ -623,7 +623,7 @@ export class Collection { } return executeOperation( - getTopology(this), + this, new DeleteManyOperation(this as TODO_NODE_3286, filter, resolveOptions(this, options)), callback ); @@ -652,7 +652,7 @@ export class Collection { // Intentionally, we do not inherit options from parent for this operation. return executeOperation( - getTopology(this), + this, new RenameOperation(this as TODO_NODE_3286, newName, { ...options, readPreference: ReadPreference.PRIMARY @@ -679,7 +679,7 @@ export class Collection { options = options ?? {}; return executeOperation( - getTopology(this), + this, new DropCollectionOperation(this.s.db, this.collectionName, options), callback ); @@ -783,7 +783,7 @@ export class Collection { if (typeof options === 'function') (callback = options), (options = {}); return executeOperation( - getTopology(this), + this, new OptionsOperation(this as TODO_NODE_3286, resolveOptions(this, options)), callback ); @@ -806,7 +806,7 @@ export class Collection { if (typeof options === 'function') (callback = options), (options = {}); return executeOperation( - getTopology(this), + this, new IsCappedOperation(this as TODO_NODE_3286, resolveOptions(this, options)), callback ); @@ -857,7 +857,7 @@ export class Collection { if (typeof options === 'function') (callback = options), (options = {}); return executeOperation( - getTopology(this), + this, new CreateIndexOperation( this as TODO_NODE_3286, this.collectionName, @@ -918,7 +918,7 @@ export class Collection { if (typeof options.maxTimeMS !== 'number') delete options.maxTimeMS; return executeOperation( - getTopology(this), + this, new CreateIndexesOperation( this as TODO_NODE_3286, this.collectionName, @@ -952,7 +952,7 @@ export class Collection { options.readPreference = ReadPreference.primary; return executeOperation( - getTopology(this), + this, new DropIndexOperation(this as TODO_NODE_3286, indexName, options), callback ); @@ -975,7 +975,7 @@ export class Collection { if (typeof options === 'function') (callback = options), (options = {}); return executeOperation( - getTopology(this), + this, new DropIndexesOperation(this as TODO_NODE_3286, resolveOptions(this, options)), callback ); @@ -1013,7 +1013,7 @@ export class Collection { if (typeof options === 'function') (callback = options), (options = {}); return executeOperation( - getTopology(this), + this, new IndexExistsOperation(this as TODO_NODE_3286, indexes, resolveOptions(this, options)), callback ); @@ -1036,7 +1036,7 @@ export class Collection { if (typeof options === 'function') (callback = options), (options = {}); return executeOperation( - getTopology(this), + this, new IndexInformationOperation(this.s.db, this.collectionName, resolveOptions(this, options)), callback ); @@ -1058,7 +1058,7 @@ export class Collection { ): Promise | void { if (typeof options === 'function') (callback = options), (options = {}); return executeOperation( - getTopology(this), + this, new EstimatedDocumentCountOperation(this as TODO_NODE_3286, resolveOptions(this, options)), callback ); @@ -1118,7 +1118,7 @@ export class Collection { filter ??= {}; return executeOperation( - getTopology(this), + this, new CountDocumentsOperation( this as TODO_NODE_3286, filter as Document, @@ -1193,7 +1193,7 @@ export class Collection { filter ??= {}; return executeOperation( - getTopology(this), + this, new DistinctOperation( this as TODO_NODE_3286, key as TODO_NODE_3286, @@ -1221,7 +1221,7 @@ export class Collection { if (typeof options === 'function') (callback = options), (options = {}); return executeOperation( - getTopology(this), + this, new IndexesOperation(this as TODO_NODE_3286, resolveOptions(this, options)), callback ); @@ -1245,7 +1245,7 @@ export class Collection { options = options ?? {}; return executeOperation( - getTopology(this), + this, new CollStatsOperation(this as TODO_NODE_3286, options), callback ); @@ -1277,7 +1277,7 @@ export class Collection { if (typeof options === 'function') (callback = options), (options = {}); return executeOperation( - getTopology(this), + this, new FindOneAndDeleteOperation( this as TODO_NODE_3286, filter, @@ -1324,7 +1324,7 @@ export class Collection { if (typeof options === 'function') (callback = options), (options = {}); return executeOperation( - getTopology(this), + this, new FindOneAndReplaceOperation( this as TODO_NODE_3286, filter, @@ -1372,7 +1372,7 @@ export class Collection { if (typeof options === 'function') (callback = options), (options = {}); return executeOperation( - getTopology(this), + this, new FindOneAndUpdateOperation( this as TODO_NODE_3286, filter, @@ -1495,7 +1495,7 @@ export class Collection { } return executeOperation( - getTopology(this), + this, new MapReduceOperation( this as TODO_NODE_3286, map, @@ -1636,7 +1636,7 @@ export class Collection { filter ??= {}; return executeOperation( - getTopology(this), + this, new CountOperation( MongoDBNamespace.fromString(this.namespace), filter, diff --git a/src/db.ts b/src/db.ts index 9aa71abbf4..7ced6684a3 100644 --- a/src/db.ts +++ b/src/db.ts @@ -258,7 +258,7 @@ export class Db { if (typeof options === 'function') (callback = options), (options = {}); return executeOperation( - getTopology(this), + this, new CreateCollectionOperation(this, name, resolveOptions(this, options)) as TODO_NODE_3286, callback ) as TODO_NODE_3286; @@ -286,11 +286,7 @@ export class Db { if (typeof options === 'function') (callback = options), (options = {}); // Intentionally, we do not inherit options from parent for this operation. - return executeOperation( - getTopology(this), - new RunCommandOperation(this, command, options ?? {}), - callback - ); + return executeOperation(this, new RunCommandOperation(this, command, options ?? {}), callback); } /** @@ -359,7 +355,7 @@ export class Db { ): Promise | void { if (typeof options === 'function') (callback = options), (options = {}); return executeOperation( - getTopology(this), + this, new DbStatsOperation(this, resolveOptions(this, options)), callback ); @@ -438,7 +434,7 @@ export class Db { options.new_collection = true; return executeOperation( - getTopology(this), + this, new RenameOperation( this.collection(fromCollection) as TODO_NODE_3286, toCollection, @@ -467,7 +463,7 @@ export class Db { if (typeof options === 'function') (callback = options), (options = {}); return executeOperation( - getTopology(this), + this, new DropCollectionOperation(this, name, resolveOptions(this, options)), callback ); @@ -490,7 +486,7 @@ export class Db { if (typeof options === 'function') (callback = options), (options = {}); return executeOperation( - getTopology(this), + this, new DropDatabaseOperation(this, resolveOptions(this, options)), callback ); @@ -513,7 +509,7 @@ export class Db { if (typeof options === 'function') (callback = options), (options = {}); return executeOperation( - getTopology(this), + this, new CollectionsOperation(this, resolveOptions(this, options)), callback ); @@ -549,7 +545,7 @@ export class Db { if (typeof options === 'function') (callback = options), (options = {}); return executeOperation( - getTopology(this), + this, new CreateIndexOperation(this, name, indexSpec, resolveOptions(this, options)), callback ); @@ -595,7 +591,7 @@ export class Db { } return executeOperation( - getTopology(this), + this, new AddUserOperation(this, username, password, resolveOptions(this, options)), callback ); @@ -620,7 +616,7 @@ export class Db { if (typeof options === 'function') (callback = options), (options = {}); return executeOperation( - getTopology(this), + this, new RemoveUserOperation(this, username, resolveOptions(this, options)), callback ); @@ -652,7 +648,7 @@ export class Db { if (typeof options === 'function') (callback = options), (options = {}); return executeOperation( - getTopology(this), + this, new SetProfilingLevelOperation(this, level, resolveOptions(this, options)), callback ); @@ -675,7 +671,7 @@ export class Db { if (typeof options === 'function') (callback = options), (options = {}); return executeOperation( - getTopology(this), + this, new ProfilingLevelOperation(this, resolveOptions(this, options)), callback ); @@ -704,7 +700,7 @@ export class Db { if (typeof options === 'function') (callback = options), (options = {}); return executeOperation( - getTopology(this), + this, new IndexInformationOperation(this, name, resolveOptions(this, options)), callback ); @@ -715,7 +711,7 @@ export class Db { * @deprecated This function is deprecated and will be removed in the next major version. */ unref(): void { - getTopology(this).unref(); + this.unref(); } /** diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index c1750726fc..b12b74ee2a 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -19,9 +19,15 @@ import { secondaryWritableServerSelector, ServerSelector } from '../sdam/server_selection'; -import type { Topology } from '../sdam/topology'; +import { Topology } from '../sdam/topology'; import type { ClientSession } from '../sessions'; -import { Callback, maybePromise, supportsRetryableWrites } from '../utils'; +import { + Callback, + getTopology, + maybePromise, + supportsRetryableWrites, + TopologyProvider +} from '../utils'; import { AbstractOperation, Aspect } from './operation'; const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation; @@ -59,31 +65,48 @@ export interface ExecutionResult { */ export function executeOperation< T extends AbstractOperation, - TResult = ResultTypeFromOperation ->(topology: Topology, operation: T): Promise; + TResult = ResultTypeFromOperation, + TSchema = Document +>(topology: Topology | TopologyProvider, operation: T): Promise; export function executeOperation< T extends AbstractOperation, - TResult = ResultTypeFromOperation ->(topology: Topology, operation: T, callback: Callback): void; + TResult = ResultTypeFromOperation, + TSchema = Document +>(topology: Topology | TopologyProvider, operation: T, callback: Callback): void; export function executeOperation< T extends AbstractOperation, - TResult = ResultTypeFromOperation ->(topology: Topology, operation: T, callback?: Callback): Promise | void; + TResult = ResultTypeFromOperation, + TSchema = Document +>( + topology: Topology | TopologyProvider, + operation: T, + callback?: Callback +): Promise | void; export function executeOperation< T extends AbstractOperation, - TResult = ResultTypeFromOperation ->(topology: Topology, operation: T, callback?: Callback): Promise | void { + TResult = ResultTypeFromOperation, + TSchema = Document +>( + topology: Topology | TopologyProvider, + operation: T, + callback?: Callback +): Promise | void { if (!(operation instanceof AbstractOperation)) { // TODO(NODE-3483): Extend MongoRuntimeError throw new MongoRuntimeError('This method requires a valid operation instance'); } return maybePromise(callback, callback => { + try { + topology = topology instanceof Topology ? topology : getTopology(topology); + } catch (error) { + return callback(error); + } if (topology.shouldCheckForSessionSupport()) { return topology.selectServer(ReadPreference.primaryPreferred, {}, err => { if (err) return callback(err); - executeOperation(topology, operation, callback); + executeOperation(topology, operation, callback); }); } diff --git a/src/operations/indexes.ts b/src/operations/indexes.ts index 78c0ceba1c..f686d59aa2 100644 --- a/src/operations/indexes.ts +++ b/src/operations/indexes.ts @@ -453,7 +453,7 @@ export class ListIndexesCursor extends AbstractCursor { session }); - executeOperation(getTopology(this.parent), operation, (err, response) => { + executeOperation(this.parent, operation, (err, response) => { if (err || response == null) return callback(err); // TODO: NODE-2882 diff --git a/src/operations/list_collections.ts b/src/operations/list_collections.ts index 2e22018383..8e32c46405 100644 --- a/src/operations/list_collections.ts +++ b/src/operations/list_collections.ts @@ -157,7 +157,7 @@ export class ListCollectionsCursor< session }); - executeOperation(getTopology(this.parent), operation, (err, response) => { + executeOperation(this.parent, operation, (err, response) => { if (err || response == null) return callback(err); // TODO: NODE-2882 diff --git a/src/utils.ts b/src/utils.ts index 5fe1edbfbc..c856079d4c 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -326,6 +326,11 @@ export function decorateWithExplain(command: Document, explain: Explain): Docume return { explain: command, verbosity: explain.verbosity }; } +/** + * @internal + */ +export type TopologyProvider = MongoClient | Db | Collection; + /** * A helper function to get the topology from a given provider. Throws * if the topology cannot be found. From a618dfe916809facfb44c439b583e79f897a98a7 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Tue, 15 Mar 2022 14:50:20 -0400 Subject: [PATCH 2/8] test: add tests for executing operations while client is disconnected --- .../connection.test.js | 43 ++++++++++++++++--- 1 file changed, 38 insertions(+), 5 deletions(-) diff --git a/test/integration/connection-monitoring-and-pooling/connection.test.js b/test/integration/connection-monitoring-and-pooling/connection.test.js index 37f83a59f5..474a5718b2 100644 --- a/test/integration/connection-monitoring-and-pooling/connection.test.js +++ b/test/integration/connection-monitoring-and-pooling/connection.test.js @@ -452,7 +452,27 @@ describe('Connection', function () { }) ); - it( + it.only('throws when attempting an operation if the client is not connected', function (done) { + const client = this.configuration.newClient(); + const collection = client.db('shouldCorrectlyFailOnRetry').collection('test'); + collection.insertOne({ a: 2 }, err => { + expect(err).to.match(/must be connected/); + done(); + }); + }); + + it.only('throws when attempting an operation if the client is not connected (promises)', async function () { + const client = this.configuration.newClient(); + const collection = client.db('shouldCorrectlyFailOnRetry').collection('test'); + + try { + await collection.insertOne({ a: 2 }); + } catch (err) { + expect(err).to.match(/must be connected/); + } + }); + + it.only( 'should correctly fail on retry when client has been closed', withClient(function (client, done) { const collection = client.db('shouldCorrectlyFailOnRetry').collection('test'); @@ -463,13 +483,26 @@ describe('Connection', function () { client.close(true, function (err) { expect(err).to.not.exist; - expect(() => { - collection.insertOne({ a: 2 }); - }).to.throw(/must be connected/); - done(); + collection.insertOne({ a: 2 }, err => { + expect(err).to.match(/must be connected/); + done(); + }); }); }); }) ); + + it.only('should correctly fail on retry when client has been closed (promises)', async function () { + const client = await this.configuration.newClient().connect(); + const collection = client.db('shouldCorrectlyFailOnRetry').collection('test'); + await collection.insertOne({ a: 1 }); + await client.close(true); + + try { + await collection.insertOne({ a: 2 }); + } catch (err) { + expect(err).to.match(/must be connected/); + } + }); }); }); From d2999abcabc48484bbb40c1fcea9931bb105c324 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Tue, 15 Mar 2022 14:50:55 -0400 Subject: [PATCH 3/8] fix: remove .only from tests --- .../connection-monitoring-and-pooling/connection.test.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/integration/connection-monitoring-and-pooling/connection.test.js b/test/integration/connection-monitoring-and-pooling/connection.test.js index 474a5718b2..08b42290bb 100644 --- a/test/integration/connection-monitoring-and-pooling/connection.test.js +++ b/test/integration/connection-monitoring-and-pooling/connection.test.js @@ -452,7 +452,7 @@ describe('Connection', function () { }) ); - it.only('throws when attempting an operation if the client is not connected', function (done) { + it('throws when attempting an operation if the client is not connected', function (done) { const client = this.configuration.newClient(); const collection = client.db('shouldCorrectlyFailOnRetry').collection('test'); collection.insertOne({ a: 2 }, err => { @@ -461,7 +461,7 @@ describe('Connection', function () { }); }); - it.only('throws when attempting an operation if the client is not connected (promises)', async function () { + it('throws when attempting an operation if the client is not connected (promises)', async function () { const client = this.configuration.newClient(); const collection = client.db('shouldCorrectlyFailOnRetry').collection('test'); @@ -472,7 +472,7 @@ describe('Connection', function () { } }); - it.only( + it( 'should correctly fail on retry when client has been closed', withClient(function (client, done) { const collection = client.db('shouldCorrectlyFailOnRetry').collection('test'); @@ -492,7 +492,7 @@ describe('Connection', function () { }) ); - it.only('should correctly fail on retry when client has been closed (promises)', async function () { + it('should correctly fail on retry when client has been closed (promises)', async function () { const client = await this.configuration.newClient().connect(); const collection = client.db('shouldCorrectlyFailOnRetry').collection('test'); await collection.insertOne({ a: 1 }); From fc0eedfcee489877034d6199b068b785f8fd94f8 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Tue, 15 Mar 2022 15:18:00 -0400 Subject: [PATCH 4/8] fix: wrap async calls to gettopology in error handling in other parts of driver --- src/operations/add_user.ts | 9 ++++++++- src/operations/common_functions.ts | 9 ++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/src/operations/add_user.ts b/src/operations/add_user.ts index 9c9077e8fb..3d0c08e79e 100644 --- a/src/operations/add_user.ts +++ b/src/operations/add_user.ts @@ -79,7 +79,14 @@ export class AddUserOperation extends CommandOperation { roles = Array.isArray(options.roles) ? options.roles : [options.roles]; } - const digestPassword = getTopology(db).lastHello().maxWireVersion >= 7; + let topology; + try { + topology = getTopology(db); + } catch (error) { + return callback(error); + } + + const digestPassword = topology.lastHello().maxWireVersion >= 7; let userPassword = password; diff --git a/src/operations/common_functions.ts b/src/operations/common_functions.ts index 5ce2aa1304..928e00fd02 100644 --- a/src/operations/common_functions.ts +++ b/src/operations/common_functions.ts @@ -40,8 +40,15 @@ export function indexInformation( // If we specified full information const full = options.full == null ? false : options.full; + let topology; + try { + topology = getTopology(db); + } catch (error) { + return callback(error); + } + // Did the user destroy the topology - if (getTopology(db).isDestroyed()) return callback(new MongoTopologyClosedError()); + if (topology.isDestroyed()) return callback(new MongoTopologyClosedError()); // Process all the results from the index command and collection function processResults(indexes: any) { // Contains all the information From 2bb307d032bfda4ec25e41f6724de5e3062a3b2c Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Tue, 15 Mar 2022 15:43:12 -0400 Subject: [PATCH 5/8] fix: use topologyprovider type in getTopology definition --- src/utils.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils.ts b/src/utils.ts index c856079d4c..bd951a1a23 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -336,7 +336,7 @@ export type TopologyProvider = MongoClient | Db | Collection; * if the topology cannot be found. * @internal */ -export function getTopology(provider: MongoClient | Db | Collection): Topology { +export function getTopology(provider: TopologyProvider): Topology { if (`topology` in provider && provider.topology) { return provider.topology; } else if ('client' in provider.s && provider.s.client.topology) { From 84c9e8ace71bf581606856733b60680cb9421ec3 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Tue, 15 Mar 2022 15:47:34 -0400 Subject: [PATCH 6/8] add back mistakenly removed call to getTopology --- src/db.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/db.ts b/src/db.ts index 7ced6684a3..539cbb745d 100644 --- a/src/db.ts +++ b/src/db.ts @@ -711,7 +711,7 @@ export class Db { * @deprecated This function is deprecated and will be removed in the next major version. */ unref(): void { - this.unref(); + getTopology(this).unref(); } /** From 1ada154186b584b6076ae20a6f155a532cc9dc72 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Wed, 16 Mar 2022 15:43:22 -0400 Subject: [PATCH 7/8] fix: address Neal's comments --- src/operations/execute_operation.ts | 26 +++++++++---------- src/utils.ts | 5 ++-- .../connection.test.js | 24 ++++++++--------- 3 files changed, 26 insertions(+), 29 deletions(-) diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index b12b74ee2a..e97b4165f2 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -65,29 +65,25 @@ export interface ExecutionResult { */ export function executeOperation< T extends AbstractOperation, - TResult = ResultTypeFromOperation, - TSchema = Document ->(topology: Topology | TopologyProvider, operation: T): Promise; + TResult = ResultTypeFromOperation +>(topologyProvider: Topology | TopologyProvider, operation: T): Promise; export function executeOperation< T extends AbstractOperation, - TResult = ResultTypeFromOperation, - TSchema = Document ->(topology: Topology | TopologyProvider, operation: T, callback: Callback): void; + TResult = ResultTypeFromOperation +>(topologyProvider: Topology | TopologyProvider, operation: T, callback: Callback): void; export function executeOperation< T extends AbstractOperation, - TResult = ResultTypeFromOperation, - TSchema = Document + TResult = ResultTypeFromOperation >( - topology: Topology | TopologyProvider, + topologyProvider: Topology | TopologyProvider, operation: T, callback?: Callback ): Promise | void; export function executeOperation< T extends AbstractOperation, - TResult = ResultTypeFromOperation, - TSchema = Document + TResult = ResultTypeFromOperation >( - topology: Topology | TopologyProvider, + topologyProvider: Topology | TopologyProvider, operation: T, callback?: Callback ): Promise | void { @@ -97,8 +93,10 @@ export function executeOperation< } return maybePromise(callback, callback => { + let topology: Topology | TopologyProvider; try { - topology = topology instanceof Topology ? topology : getTopology(topology); + topology = + topologyProvider instanceof Topology ? topologyProvider : getTopology(topologyProvider); } catch (error) { return callback(error); } @@ -106,7 +104,7 @@ export function executeOperation< return topology.selectServer(ReadPreference.primaryPreferred, {}, err => { if (err) return callback(err); - executeOperation(topology, operation, callback); + executeOperation(topology, operation, callback); }); } diff --git a/src/utils.ts b/src/utils.ts index bd951a1a23..41cd68cedc 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -329,14 +329,15 @@ export function decorateWithExplain(command: Document, explain: Explain): Docume /** * @internal */ -export type TopologyProvider = MongoClient | Db | Collection; +export type TopologyProvider = MongoClient | Db | Collection; /** * A helper function to get the topology from a given provider. Throws * if the topology cannot be found. + * @throws MongoNotConnectedError * @internal */ -export function getTopology(provider: TopologyProvider): Topology { +export function getTopology(provider: TopologyProvider): Topology { if (`topology` in provider && provider.topology) { return provider.topology; } else if ('client' in provider.s && provider.s.client.topology) { diff --git a/test/integration/connection-monitoring-and-pooling/connection.test.js b/test/integration/connection-monitoring-and-pooling/connection.test.js index 08b42290bb..cfefbe987d 100644 --- a/test/integration/connection-monitoring-and-pooling/connection.test.js +++ b/test/integration/connection-monitoring-and-pooling/connection.test.js @@ -1,6 +1,10 @@ 'use strict'; -const { ServerHeartbeatStartedEvent, MongoClient } = require('../../../src'); +const { + ServerHeartbeatStartedEvent, + MongoClient, + MongoNotConnectedError +} = require('../../../src'); const { Connection } = require('../../../src/cmap/connection'); const { connect } = require('../../../src/cmap/connect'); const { expect } = require('chai'); @@ -456,7 +460,7 @@ describe('Connection', function () { const client = this.configuration.newClient(); const collection = client.db('shouldCorrectlyFailOnRetry').collection('test'); collection.insertOne({ a: 2 }, err => { - expect(err).to.match(/must be connected/); + expect(err).to.be.instanceof(MongoNotConnectedError); done(); }); }); @@ -465,11 +469,8 @@ describe('Connection', function () { const client = this.configuration.newClient(); const collection = client.db('shouldCorrectlyFailOnRetry').collection('test'); - try { - await collection.insertOne({ a: 2 }); - } catch (err) { - expect(err).to.match(/must be connected/); - } + const err = await collection.insertOne({ a: 2 }).catch(err => err); + expect(err).to.be.instanceof(MongoNotConnectedError); }); it( @@ -484,7 +485,7 @@ describe('Connection', function () { expect(err).to.not.exist; collection.insertOne({ a: 2 }, err => { - expect(err).to.match(/must be connected/); + expect(err).to.be.instanceof(MongoNotConnectedError); done(); }); }); @@ -498,11 +499,8 @@ describe('Connection', function () { await collection.insertOne({ a: 1 }); await client.close(true); - try { - await collection.insertOne({ a: 2 }); - } catch (err) { - expect(err).to.match(/must be connected/); - } + const err = await collection.insertOne({ a: 2 }).catch(err => err); + expect(err).to.be.instanceof(MongoNotConnectedError); }); }); }); From 66acf1957e39bf93bf50f8a25ed721f5d5f4c155 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Thu, 17 Mar 2022 17:00:57 -0400 Subject: [PATCH 8/8] fix: address circular dependencies --- src/bulk/common.ts | 8 ++++---- src/change_stream.ts | 2 +- src/cursor/abstract_cursor.ts | 2 +- src/cursor/aggregation_cursor.ts | 4 ++-- src/cursor/find_cursor.ts | 6 +++--- src/operations/execute_operation.ts | 17 ++++++++--------- src/operations/map_reduce.ts | 5 +---- src/sessions.ts | 4 ++-- src/utils.ts | 16 +++++++++++++--- 9 files changed, 35 insertions(+), 29 deletions(-) diff --git a/src/bulk/common.ts b/src/bulk/common.ts index 0a1a8171eb..36afe6d04d 100644 --- a/src/bulk/common.ts +++ b/src/bulk/common.ts @@ -655,19 +655,19 @@ function executeCommands( try { if (isInsertBatch(batch)) { executeOperation( - bulkOperation.s.topology, + bulkOperation.s.collection, new InsertOperation(bulkOperation.s.namespace, batch.operations, finalOptions), resultHandler ); } else if (isUpdateBatch(batch)) { executeOperation( - bulkOperation.s.topology, + bulkOperation.s.collection, new UpdateOperation(bulkOperation.s.namespace, batch.operations, finalOptions), resultHandler ); } else if (isDeleteBatch(batch)) { executeOperation( - bulkOperation.s.topology, + bulkOperation.s.collection, new DeleteOperation(bulkOperation.s.namespace, batch.operations, finalOptions), resultHandler ); @@ -1285,7 +1285,7 @@ export abstract class BulkOperationBase { const finalOptions = { ...this.s.options, ...options }; const operation = new BulkWriteShimOperation(this, finalOptions); - return executeOperation(this.s.topology, operation, callback); + return executeOperation(this.s.collection, operation, callback); } /** diff --git a/src/change_stream.ts b/src/change_stream.ts index 57ee4f7b5e..73b4b78dd9 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -503,7 +503,7 @@ export class ChangeStreamCursor extends Abs session }); - executeOperation(this.topology, aggregateOperation, (err, response) => { + executeOperation(session, aggregateOperation, (err, response) => { if (err || response == null) { return callback(err); } diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index ea8e769495..1ef82f13f0 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -618,7 +618,7 @@ export abstract class AbstractCursor< batchSize }); - executeOperation(this.topology, getMoreOperation, callback); + executeOperation(this, getMoreOperation, callback); } } diff --git a/src/cursor/aggregation_cursor.ts b/src/cursor/aggregation_cursor.ts index 4cf4c85b88..311d8a6a33 100644 --- a/src/cursor/aggregation_cursor.ts +++ b/src/cursor/aggregation_cursor.ts @@ -68,7 +68,7 @@ export class AggregationCursor extends AbstractCursor { + executeOperation(this, aggregateOperation, (err, response) => { if (err || response == null) return callback(err); // TODO: NODE-2882 @@ -88,7 +88,7 @@ export class AggregationCursor extends AbstractCursor extends AbstractCursor { session }); - executeOperation(this.topology, findOperation, (err, response) => { + executeOperation(this, findOperation, (err, response) => { if (err || response == null) return callback(err); // TODO: We only need this for legacy queries that do not support `limit`, maybe @@ -143,7 +143,7 @@ export class FindCursor extends AbstractCursor { options = options ?? {}; return executeOperation( - this.topology, + this, new CountOperation(this.namespace, this[kFilter], { ...this[kBuiltOptions], // NOTE: order matters here, we may need to refine this ...this.cursorOptions, @@ -165,7 +165,7 @@ export class FindCursor extends AbstractCursor { if (verbosity == null) verbosity = true; return executeOperation( - this.topology, + this, new FindOperation(undefined, this.namespace, this[kFilter], { ...this[kBuiltOptions], // NOTE: order matters here, we may need to refine this ...this.cursorOptions, diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index e97b4165f2..722d322f72 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -19,7 +19,7 @@ import { secondaryWritableServerSelector, ServerSelector } from '../sdam/server_selection'; -import { Topology } from '../sdam/topology'; +import type { Topology } from '../sdam/topology'; import type { ClientSession } from '../sessions'; import { Callback, @@ -66,16 +66,16 @@ export interface ExecutionResult { export function executeOperation< T extends AbstractOperation, TResult = ResultTypeFromOperation ->(topologyProvider: Topology | TopologyProvider, operation: T): Promise; +>(topologyProvider: TopologyProvider, operation: T): Promise; export function executeOperation< T extends AbstractOperation, TResult = ResultTypeFromOperation ->(topologyProvider: Topology | TopologyProvider, operation: T, callback: Callback): void; +>(topologyProvider: TopologyProvider, operation: T, callback: Callback): void; export function executeOperation< T extends AbstractOperation, TResult = ResultTypeFromOperation >( - topologyProvider: Topology | TopologyProvider, + topologyProvider: TopologyProvider, operation: T, callback?: Callback ): Promise | void; @@ -83,7 +83,7 @@ export function executeOperation< T extends AbstractOperation, TResult = ResultTypeFromOperation >( - topologyProvider: Topology | TopologyProvider, + topologyProvider: TopologyProvider, operation: T, callback?: Callback ): Promise | void { @@ -93,10 +93,9 @@ export function executeOperation< } return maybePromise(callback, callback => { - let topology: Topology | TopologyProvider; + let topology: Topology; try { - topology = - topologyProvider instanceof Topology ? topologyProvider : getTopology(topologyProvider); + topology = getTopology(topologyProvider); } catch (error) { return callback(error); } @@ -104,7 +103,7 @@ export function executeOperation< return topology.selectServer(ReadPreference.primaryPreferred, {}, err => { if (err) return callback(err); - executeOperation(topology, operation, callback); + executeOperation(topologyProvider, operation, callback); }); } diff --git a/src/operations/map_reduce.ts b/src/operations/map_reduce.ts index 48ad4c9894..f3134f45ad 100644 --- a/src/operations/map_reduce.ts +++ b/src/operations/map_reduce.ts @@ -1,7 +1,6 @@ import type { ObjectId } from '../bson'; import { Code, Document } from '../bson'; import type { Collection } from '../collection'; -import { Db } from '../db'; import { MongoCompatibilityError, MongoServerError } from '../error'; import { ReadPreference, ReadPreferenceMode } from '../read_preference'; import type { Server } from '../sdam/server'; @@ -210,9 +209,7 @@ export class MapReduceOperation extends CommandOperation if (result.result != null && typeof result.result === 'object') { const doc = result.result; // Return a collection from another db - collection = new Db(coll.s.db.s.client, doc.db, coll.s.db.s.options).collection( - doc.collection - ); + collection = coll.s.db.s.client.db(doc.db, coll.s.db.s.options).collection(doc.collection); } else { // Create a collection object that wraps the result collection collection = coll.s.db.collection(result.result); diff --git a/src/sessions.ts b/src/sessions.ts index 7b05edee40..7c853740f7 100644 --- a/src/sessions.ts +++ b/src/sessions.ts @@ -757,7 +757,7 @@ function endTransaction( // send the command executeOperation( - session.topology, + session, new RunAdminCommandOperation(undefined, command, { session, readPreference: ReadPreference.primary, @@ -781,7 +781,7 @@ function endTransaction( } return executeOperation( - session.topology, + session, new RunAdminCommandOperation(undefined, command, { session, readPreference: ReadPreference.primary, diff --git a/src/utils.ts b/src/utils.ts index 41cd68cedc..129d3995b7 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -8,6 +8,8 @@ import type { Connection } from './cmap/connection'; import { MAX_SUPPORTED_WIRE_VERSION } from './cmap/wire_protocol/constants'; import type { Collection } from './collection'; import { LEGACY_HELLO_COMMAND } from './constants'; +import type { AbstractCursor } from './cursor/abstract_cursor'; +import type { FindCursor } from './cursor/find_cursor'; import type { Db } from './db'; import { AnyError, @@ -28,6 +30,7 @@ import { ReadPreference } from './read_preference'; import { ServerType } from './sdam/common'; import type { Server } from './sdam/server'; import type { Topology } from './sdam/topology'; +import type { ClientSession } from './sessions'; import { W, WriteConcern, WriteConcernOptions } from './write_concern'; /** @@ -329,7 +332,13 @@ export function decorateWithExplain(command: Document, explain: Explain): Docume /** * @internal */ -export type TopologyProvider = MongoClient | Db | Collection; +export type TopologyProvider = + | MongoClient + | ClientSession + | FindCursor + | AbstractCursor + | Collection + | Db; /** * A helper function to get the topology from a given provider. Throws @@ -338,11 +347,12 @@ export type TopologyProvider = MongoClient | Db | Collection; * @internal */ export function getTopology(provider: TopologyProvider): Topology { + // MongoClient or ClientSession or AbstractCursor if (`topology` in provider && provider.topology) { return provider.topology; - } else if ('client' in provider.s && provider.s.client.topology) { + } else if ('s' in provider && 'client' in provider.s && provider.s.client.topology) { return provider.s.client.topology; - } else if ('db' in provider.s && provider.s.db.s.client.topology) { + } else if ('s' in provider && 'db' in provider.s && provider.s.db.s.client.topology) { return provider.s.db.s.client.topology; }