From 7aa3008d58b9d9869c2ea4af7809fa6b5cfbf6f4 Mon Sep 17 00:00:00 2001 From: Daria Pardue <81593090+dariakp@users.noreply.github.com> Date: Fri, 16 Jul 2021 10:10:12 -0400 Subject: [PATCH] fix(NODE-3393): snapshot time not applied if distinct executed first (#2908) --- src/sessions.ts | 13 +- test/functional/sessions.test.js | 26 +--- .../unified-spec-runner/operations.ts | 122 ++++++------------ 3 files changed, 50 insertions(+), 111 deletions(-) diff --git a/src/sessions.ts b/src/sessions.ts index 2b5324061f..fb59f75952 100644 --- a/src/sessions.ts +++ b/src/sessions.ts @@ -891,11 +891,12 @@ export function updateSessionFromResponse(session: ClientSession, document: Docu session.transaction._recoveryToken = document.recoveryToken; } - if ( - document.cursor?.atClusterTime && - session?.[kSnapshotEnabled] && - session[kSnapshotTime] === undefined - ) { - session[kSnapshotTime] = document.cursor.atClusterTime; + if (session?.[kSnapshotEnabled] && session[kSnapshotTime] === undefined) { + // find and aggregate commands return atClusterTime on the cursor + // distinct includes it in the response body + const atClusterTime = document.cursor?.atClusterTime || document.atClusterTime; + if (atClusterTime) { + session[kSnapshotTime] = atClusterTime; + } } } diff --git a/test/functional/sessions.test.js b/test/functional/sessions.test.js index f0e0fb5cb7..5be0843d3c 100644 --- a/test/functional/sessions.test.js +++ b/test/functional/sessions.test.js @@ -200,35 +200,11 @@ describe('Sessions - functional', function () { for (const sessionTests of loadSpecTests(path.join('sessions', 'unified'))) { expect(sessionTests).to.be.an('object'); context(String(sessionTests.description), function () { - // TODO: NODE-3393 fix test runner to apply session to all operations - const skipTestMap = { - 'snapshot-sessions': [ - 'countDocuments operation with snapshot', - 'Distinct operation with snapshot', - 'Mixed operation with snapshot' - ], - 'snapshot-sessions-not-supported-client-error': [ - 'Client error on distinct with snapshot' - ], - 'snapshot-sessions-not-supported-server-error': [ - 'Server returns an error on distinct with snapshot' - ], - 'snapshot-sessions-unsupported-ops': [ - 'Server returns an error on listCollections with snapshot', - 'Server returns an error on listDatabases with snapshot', - 'Server returns an error on listIndexes with snapshot', - 'Server returns an error on runCommand with snapshot', - 'Server returns an error on findOneAndUpdate with snapshot', - 'Server returns an error on deleteOne with snapshot', - 'Server returns an error on updateOne with snapshot' - ] - }; - const testsToSkip = skipTestMap[sessionTests.description] || []; for (const test of sessionTests.tests) { it(String(test.description), { metadata: { sessions: { skipLeakTests: true } }, test: async function () { - await runUnifiedTest(this, sessionTests, test, testsToSkip); + await runUnifiedTest(this, sessionTests, test); } }); } diff --git a/test/functional/unified-spec-runner/operations.ts b/test/functional/unified-spec-runner/operations.ts index a4dda6f5ed..16ac952f88 100644 --- a/test/functional/unified-spec-runner/operations.ts +++ b/test/functional/unified-spec-runner/operations.ts @@ -4,14 +4,13 @@ import { Collection, Db, GridFSFile, MongoClient, ObjectId, AbstractCursor } fro import { ReadConcern } from '../../../src/read_concern'; import { ReadPreference } from '../../../src/read_preference'; import { WriteConcern } from '../../../src/write_concern'; -import { Document, InsertOneOptions } from '../../../src'; +import { Document } from '../../../src'; import { EventCollector } from '../../tools/utils'; import { EntitiesMap } from './entities'; import { expectErrorCheck, resultCheck } from './match'; import type { OperationDescription } from './schema'; import { CommandStartedEvent } from '../../../src/cmap/command_monitoring_events'; import { translateOptions } from './unified-utils'; -import { getSymbolFrom } from '../../tools/utils'; interface OperationFunctionParams { client: MongoClient; @@ -22,18 +21,6 @@ interface OperationFunctionParams { type RunOperationFn = (p: OperationFunctionParams) => Promise; export const operations = new Map(); -function executeWithPotentialSession( - entities: EntitiesMap, - operation: OperationDescription, - cursor: AbstractCursor -) { - const session = entities.getEntity('session', operation.arguments.session, false); - if (session) { - cursor.session = session; - } - return cursor.toArray(); -} - operations.set('abortTransaction', async ({ entities, operation }) => { const session = entities.getEntity('session', operation.object); return session.abortTransaction(); @@ -44,18 +31,9 @@ operations.set('aggregate', async ({ entities, operation }) => { if (!(dbOrCollection instanceof Db || dbOrCollection instanceof Collection)) { throw new Error(`Operation object '${operation.object}' must be a db or collection`); } - const cursor = dbOrCollection.aggregate(operation.arguments.pipeline, { - allowDiskUse: operation.arguments.allowDiskUse, - batchSize: operation.arguments.batchSize, - bypassDocumentValidation: operation.arguments.bypassDocumentValidation, - maxTimeMS: operation.arguments.maxTimeMS, - maxAwaitTimeMS: operation.arguments.maxAwaitTimeMS, - collation: operation.arguments.collation, - hint: operation.arguments.hint, - let: operation.arguments.let, - out: operation.arguments.out - }); - return executeWithPotentialSession(entities, operation, cursor); + const { pipeline, ...opts } = operation.arguments; + const cursor = dbOrCollection.aggregate(pipeline, opts); + return cursor.toArray(); }); operations.set('assertCollectionExists', async ({ operation, client }) => { @@ -139,27 +117,27 @@ operations.set('assertSameLsidOnLastTwoCommands', async ({ entities, operation } }); operations.set('assertSessionDirty', async ({ entities, operation }) => { - const session = entities.getEntity('session', operation.arguments.session); + const session = operation.arguments.session; expect(session.serverSession.isDirty).to.be.true; }); operations.set('assertSessionNotDirty', async ({ entities, operation }) => { - const session = entities.getEntity('session', operation.arguments.session); + const session = operation.arguments.session; expect(session.serverSession.isDirty).to.be.false; }); operations.set('assertSessionPinned', async ({ entities, operation }) => { - const session = entities.getEntity('session', operation.arguments.session); + const session = operation.arguments.session; expect(session.transaction.isPinned).to.be.true; }); operations.set('assertSessionUnpinned', async ({ entities, operation }) => { - const session = entities.getEntity('session', operation.arguments.session); + const session = operation.arguments.session; expect(session.transaction.isPinned).to.be.false; }); operations.set('assertSessionTransactionState', async ({ entities, operation }) => { - const session = entities.getEntity('session', operation.arguments.session); + const session = operation.arguments.session; const transactionStateTranslation = { none: 'NO_TRANSACTION', @@ -236,17 +214,14 @@ operations.set('createChangeStream', async ({ entities, operation }) => { operations.set('createCollection', async ({ entities, operation }) => { const db = entities.getEntity('db', operation.object); - const { session, collection, ...opts } = operation.arguments; - await db.createCollection(collection, { - session: entities.getEntity('session', session, false), - ...opts - }); + const { collection, ...opts } = operation.arguments; + await db.createCollection(collection, opts); }); operations.set('createFindCursor', async ({ entities, operation }) => { const collection = entities.getEntity('collection', operation.object); - const { filter, sort, batchSize, limit, let: vars } = operation.arguments; - const cursor = collection.find(filter, { sort, batchSize, limit, let: vars }); + const { filter, ...opts } = operation.arguments; + const cursor = collection.find(filter, opts); // The spec dictates that we create the cursor and force the find command // to execute, but don't move the cursor forward. hasNext() accomplishes // this. @@ -256,11 +231,8 @@ operations.set('createFindCursor', async ({ entities, operation }) => { operations.set('createIndex', async ({ entities, operation }) => { const collection = entities.getEntity('collection', operation.object); - const session = entities.getEntity('session', operation.arguments.session, false); - await collection.createIndex(operation.arguments.keys, { - session, - name: operation.arguments.name - }); + const { keys, ...opts } = operation.arguments; + await collection.createIndex(keys, opts); }); operations.set('deleteOne', async ({ entities, operation }) => { @@ -271,7 +243,8 @@ operations.set('deleteOne', async ({ entities, operation }) => { operations.set('dropCollection', async ({ entities, operation }) => { const db = entities.getEntity('db', operation.object); - return await db.dropCollection(operation.arguments.collection); + const { collection, ...opts } = operation.arguments; + return await db.dropCollection(collection, opts); }); operations.set('endSession', async ({ entities, operation }) => { @@ -281,9 +254,8 @@ operations.set('endSession', async ({ entities, operation }) => { operations.set('find', async ({ entities, operation }) => { const collection = entities.getEntity('collection', operation.object); - const { filter, sort, batchSize, limit, let: vars } = operation.arguments; - const cursor = collection.find(filter, { sort, batchSize, limit, let: vars }); - return executeWithPotentialSession(entities, operation, cursor); + const { filter, ...opts } = operation.arguments; + return collection.find(filter, opts).toArray(); }); operations.set('findOneAndReplace', async ({ entities, operation }) => { @@ -305,27 +277,14 @@ operations.set('failPoint', async ({ entities, operation }) => { operations.set('insertOne', async ({ entities, operation }) => { const collection = entities.getEntity('collection', operation.object); - - const session = entities.getEntity('session', operation.arguments.session, false); - - const options = { - session - } as InsertOneOptions; - - return collection.insertOne(operation.arguments.document, options); + const { document, ...opts } = operation.arguments; + return collection.insertOne(document, opts); }); operations.set('insertMany', async ({ entities, operation }) => { const collection = entities.getEntity('collection', operation.object); - - const session = entities.getEntity('session', operation.arguments.session, false); - - const options = { - session, - ordered: operation.arguments.ordered ?? true - }; - - return collection.insertMany(operation.arguments.documents, options); + const { documents, ...opts } = operation.arguments; + return collection.insertMany(documents, opts); }); operations.set('iterateUntilDocumentOrError', async ({ entities, operation }) => { @@ -350,7 +309,7 @@ operations.set('listCollections', async ({ entities, operation }) => { operations.set('listDatabases', async ({ entities, operation }) => { const client = entities.getEntity('client', operation.object); - return client.db().admin().listDatabases(); + return client.db().admin().listDatabases(operation.arguments); }); operations.set('listIndexes', async ({ entities, operation }) => { @@ -360,12 +319,8 @@ operations.set('listIndexes', async ({ entities, operation }) => { operations.set('replaceOne', async ({ entities, operation }) => { const collection = entities.getEntity('collection', operation.object); - return collection.replaceOne(operation.arguments.filter, operation.arguments.replacement, { - bypassDocumentValidation: operation.arguments.bypassDocumentValidation, - collation: operation.arguments.collation, - hint: operation.arguments.hint, - upsert: operation.arguments.upsert - }); + const { filter, replacement, ...opts } = operation.arguments; + return collection.replaceOne(filter, replacement, opts); }); operations.set('startTransaction', async ({ entities, operation }) => { @@ -374,7 +329,7 @@ operations.set('startTransaction', async ({ entities, operation }) => { }); operations.set('targetedFailPoint', async ({ entities, operation }) => { - const session = entities.getEntity('session', operation.arguments.session); + const session = operation.arguments.session; expect(session.transaction.isPinned, 'Session must be pinned for a targetedFailPoint').to.be.true; await entities.failPoints.enableFailPoint( session.transaction._pinnedServer.s.description.hostAddress, @@ -433,20 +388,20 @@ operations.set('withTransaction', async ({ entities, operation, client }) => { operations.set('countDocuments', async ({ entities, operation }) => { const collection = entities.getEntity('collection', operation.object); - return collection.countDocuments(operation.arguments.filter as Document); + const { filter, ...opts } = operation.arguments; + return collection.countDocuments(filter, opts); }); operations.set('deleteMany', async ({ entities, operation }) => { const collection = entities.getEntity('collection', operation.object); - return collection.deleteMany(operation.arguments.filter); + const { filter, ...opts } = operation.arguments; + return collection.deleteMany(filter, opts); }); operations.set('distinct', async ({ entities, operation }) => { const collection = entities.getEntity('collection', operation.object); - return collection.distinct( - operation.arguments.fieldName as string, - operation.arguments.filter as Document - ); + const { fieldName, filter, ...opts } = operation.arguments; + return collection.distinct(fieldName, filter, opts); }); operations.set('estimatedDocumentCount', async ({ entities, operation }) => { @@ -456,12 +411,14 @@ operations.set('estimatedDocumentCount', async ({ entities, operation }) => { operations.set('findOneAndDelete', async ({ entities, operation }) => { const collection = entities.getEntity('collection', operation.object); - return collection.findOneAndDelete(operation.arguments.filter); + const { filter, ...opts } = operation.arguments; + return collection.findOneAndDelete(filter, opts); }); operations.set('runCommand', async ({ entities, operation }: OperationFunctionParams) => { const db = entities.getEntity('db', operation.object); - return db.command(operation.arguments.command); + const { command, ...opts } = operation.arguments; + return db.command(command, opts); }); operations.set('updateMany', async ({ entities, operation }) => { @@ -484,6 +441,11 @@ export async function executeOperationAndCheck( const opFunc = operations.get(operation.name); expect(opFunc, `Unknown operation: ${operation.name}`).to.exist; + if (operation.arguments?.session) { + const session = entities.getEntity('session', operation.arguments.session, false); + operation.arguments.session = session; + } + let result; try {