Skip to content

Commit

Permalink
fix(NODE-3393): snapshot time not applied if distinct executed first (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
dariakp committed Jul 16, 2021
1 parent b67af3c commit 7aa3008
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 111 deletions.
13 changes: 7 additions & 6 deletions src/sessions.ts
Expand Up @@ -891,11 +891,12 @@ export function updateSessionFromResponse(session: ClientSession, document: Docu
session.transaction._recoveryToken = document.recoveryToken;
}

if (
document.cursor?.atClusterTime &&
session?.[kSnapshotEnabled] &&
session[kSnapshotTime] === undefined
) {
session[kSnapshotTime] = document.cursor.atClusterTime;
if (session?.[kSnapshotEnabled] && session[kSnapshotTime] === undefined) {
// find and aggregate commands return atClusterTime on the cursor
// distinct includes it in the response body
const atClusterTime = document.cursor?.atClusterTime || document.atClusterTime;
if (atClusterTime) {
session[kSnapshotTime] = atClusterTime;
}
}
}
26 changes: 1 addition & 25 deletions test/functional/sessions.test.js
Expand Up @@ -200,35 +200,11 @@ describe('Sessions - functional', function () {
for (const sessionTests of loadSpecTests(path.join('sessions', 'unified'))) {
expect(sessionTests).to.be.an('object');
context(String(sessionTests.description), function () {
// TODO: NODE-3393 fix test runner to apply session to all operations
const skipTestMap = {
'snapshot-sessions': [
'countDocuments operation with snapshot',
'Distinct operation with snapshot',
'Mixed operation with snapshot'
],
'snapshot-sessions-not-supported-client-error': [
'Client error on distinct with snapshot'
],
'snapshot-sessions-not-supported-server-error': [
'Server returns an error on distinct with snapshot'
],
'snapshot-sessions-unsupported-ops': [
'Server returns an error on listCollections with snapshot',
'Server returns an error on listDatabases with snapshot',
'Server returns an error on listIndexes with snapshot',
'Server returns an error on runCommand with snapshot',
'Server returns an error on findOneAndUpdate with snapshot',
'Server returns an error on deleteOne with snapshot',
'Server returns an error on updateOne with snapshot'
]
};
const testsToSkip = skipTestMap[sessionTests.description] || [];
for (const test of sessionTests.tests) {
it(String(test.description), {
metadata: { sessions: { skipLeakTests: true } },
test: async function () {
await runUnifiedTest(this, sessionTests, test, testsToSkip);
await runUnifiedTest(this, sessionTests, test);
}
});
}
Expand Down
122 changes: 42 additions & 80 deletions test/functional/unified-spec-runner/operations.ts
Expand Up @@ -4,14 +4,13 @@ import { Collection, Db, GridFSFile, MongoClient, ObjectId, AbstractCursor } fro
import { ReadConcern } from '../../../src/read_concern';
import { ReadPreference } from '../../../src/read_preference';
import { WriteConcern } from '../../../src/write_concern';
import { Document, InsertOneOptions } from '../../../src';
import { Document } from '../../../src';
import { EventCollector } from '../../tools/utils';
import { EntitiesMap } from './entities';
import { expectErrorCheck, resultCheck } from './match';
import type { OperationDescription } from './schema';
import { CommandStartedEvent } from '../../../src/cmap/command_monitoring_events';
import { translateOptions } from './unified-utils';
import { getSymbolFrom } from '../../tools/utils';

interface OperationFunctionParams {
client: MongoClient;
Expand All @@ -22,18 +21,6 @@ interface OperationFunctionParams {
type RunOperationFn = (p: OperationFunctionParams) => Promise<Document | boolean | number | void>;
export const operations = new Map<string, RunOperationFn>();

function executeWithPotentialSession(
entities: EntitiesMap,
operation: OperationDescription,
cursor: AbstractCursor
) {
const session = entities.getEntity('session', operation.arguments.session, false);
if (session) {
cursor.session = session;
}
return cursor.toArray();
}

operations.set('abortTransaction', async ({ entities, operation }) => {
const session = entities.getEntity('session', operation.object);
return session.abortTransaction();
Expand All @@ -44,18 +31,9 @@ operations.set('aggregate', async ({ entities, operation }) => {
if (!(dbOrCollection instanceof Db || dbOrCollection instanceof Collection)) {
throw new Error(`Operation object '${operation.object}' must be a db or collection`);
}
const cursor = dbOrCollection.aggregate(operation.arguments.pipeline, {
allowDiskUse: operation.arguments.allowDiskUse,
batchSize: operation.arguments.batchSize,
bypassDocumentValidation: operation.arguments.bypassDocumentValidation,
maxTimeMS: operation.arguments.maxTimeMS,
maxAwaitTimeMS: operation.arguments.maxAwaitTimeMS,
collation: operation.arguments.collation,
hint: operation.arguments.hint,
let: operation.arguments.let,
out: operation.arguments.out
});
return executeWithPotentialSession(entities, operation, cursor);
const { pipeline, ...opts } = operation.arguments;
const cursor = dbOrCollection.aggregate(pipeline, opts);
return cursor.toArray();
});

operations.set('assertCollectionExists', async ({ operation, client }) => {
Expand Down Expand Up @@ -139,27 +117,27 @@ operations.set('assertSameLsidOnLastTwoCommands', async ({ entities, operation }
});

operations.set('assertSessionDirty', async ({ entities, operation }) => {
const session = entities.getEntity('session', operation.arguments.session);
const session = operation.arguments.session;
expect(session.serverSession.isDirty).to.be.true;
});

operations.set('assertSessionNotDirty', async ({ entities, operation }) => {
const session = entities.getEntity('session', operation.arguments.session);
const session = operation.arguments.session;
expect(session.serverSession.isDirty).to.be.false;
});

operations.set('assertSessionPinned', async ({ entities, operation }) => {
const session = entities.getEntity('session', operation.arguments.session);
const session = operation.arguments.session;
expect(session.transaction.isPinned).to.be.true;
});

operations.set('assertSessionUnpinned', async ({ entities, operation }) => {
const session = entities.getEntity('session', operation.arguments.session);
const session = operation.arguments.session;
expect(session.transaction.isPinned).to.be.false;
});

operations.set('assertSessionTransactionState', async ({ entities, operation }) => {
const session = entities.getEntity('session', operation.arguments.session);
const session = operation.arguments.session;

const transactionStateTranslation = {
none: 'NO_TRANSACTION',
Expand Down Expand Up @@ -236,17 +214,14 @@ operations.set('createChangeStream', async ({ entities, operation }) => {

operations.set('createCollection', async ({ entities, operation }) => {
const db = entities.getEntity('db', operation.object);
const { session, collection, ...opts } = operation.arguments;
await db.createCollection(collection, {
session: entities.getEntity('session', session, false),
...opts
});
const { collection, ...opts } = operation.arguments;
await db.createCollection(collection, opts);
});

operations.set('createFindCursor', async ({ entities, operation }) => {
const collection = entities.getEntity('collection', operation.object);
const { filter, sort, batchSize, limit, let: vars } = operation.arguments;
const cursor = collection.find(filter, { sort, batchSize, limit, let: vars });
const { filter, ...opts } = operation.arguments;
const cursor = collection.find(filter, opts);
// The spec dictates that we create the cursor and force the find command
// to execute, but don't move the cursor forward. hasNext() accomplishes
// this.
Expand All @@ -256,11 +231,8 @@ operations.set('createFindCursor', async ({ entities, operation }) => {

operations.set('createIndex', async ({ entities, operation }) => {
const collection = entities.getEntity('collection', operation.object);
const session = entities.getEntity('session', operation.arguments.session, false);
await collection.createIndex(operation.arguments.keys, {
session,
name: operation.arguments.name
});
const { keys, ...opts } = operation.arguments;
await collection.createIndex(keys, opts);
});

operations.set('deleteOne', async ({ entities, operation }) => {
Expand All @@ -271,7 +243,8 @@ operations.set('deleteOne', async ({ entities, operation }) => {

operations.set('dropCollection', async ({ entities, operation }) => {
const db = entities.getEntity('db', operation.object);
return await db.dropCollection(operation.arguments.collection);
const { collection, ...opts } = operation.arguments;
return await db.dropCollection(collection, opts);
});

operations.set('endSession', async ({ entities, operation }) => {
Expand All @@ -281,9 +254,8 @@ operations.set('endSession', async ({ entities, operation }) => {

operations.set('find', async ({ entities, operation }) => {
const collection = entities.getEntity('collection', operation.object);
const { filter, sort, batchSize, limit, let: vars } = operation.arguments;
const cursor = collection.find(filter, { sort, batchSize, limit, let: vars });
return executeWithPotentialSession(entities, operation, cursor);
const { filter, ...opts } = operation.arguments;
return collection.find(filter, opts).toArray();
});

operations.set('findOneAndReplace', async ({ entities, operation }) => {
Expand All @@ -305,27 +277,14 @@ operations.set('failPoint', async ({ entities, operation }) => {

operations.set('insertOne', async ({ entities, operation }) => {
const collection = entities.getEntity('collection', operation.object);

const session = entities.getEntity('session', operation.arguments.session, false);

const options = {
session
} as InsertOneOptions;

return collection.insertOne(operation.arguments.document, options);
const { document, ...opts } = operation.arguments;
return collection.insertOne(document, opts);
});

operations.set('insertMany', async ({ entities, operation }) => {
const collection = entities.getEntity('collection', operation.object);

const session = entities.getEntity('session', operation.arguments.session, false);

const options = {
session,
ordered: operation.arguments.ordered ?? true
};

return collection.insertMany(operation.arguments.documents, options);
const { documents, ...opts } = operation.arguments;
return collection.insertMany(documents, opts);
});

operations.set('iterateUntilDocumentOrError', async ({ entities, operation }) => {
Expand All @@ -350,7 +309,7 @@ operations.set('listCollections', async ({ entities, operation }) => {

operations.set('listDatabases', async ({ entities, operation }) => {
const client = entities.getEntity('client', operation.object);
return client.db().admin().listDatabases();
return client.db().admin().listDatabases(operation.arguments);
});

operations.set('listIndexes', async ({ entities, operation }) => {
Expand All @@ -360,12 +319,8 @@ operations.set('listIndexes', async ({ entities, operation }) => {

operations.set('replaceOne', async ({ entities, operation }) => {
const collection = entities.getEntity('collection', operation.object);
return collection.replaceOne(operation.arguments.filter, operation.arguments.replacement, {
bypassDocumentValidation: operation.arguments.bypassDocumentValidation,
collation: operation.arguments.collation,
hint: operation.arguments.hint,
upsert: operation.arguments.upsert
});
const { filter, replacement, ...opts } = operation.arguments;
return collection.replaceOne(filter, replacement, opts);
});

operations.set('startTransaction', async ({ entities, operation }) => {
Expand All @@ -374,7 +329,7 @@ operations.set('startTransaction', async ({ entities, operation }) => {
});

operations.set('targetedFailPoint', async ({ entities, operation }) => {
const session = entities.getEntity('session', operation.arguments.session);
const session = operation.arguments.session;
expect(session.transaction.isPinned, 'Session must be pinned for a targetedFailPoint').to.be.true;
await entities.failPoints.enableFailPoint(
session.transaction._pinnedServer.s.description.hostAddress,
Expand Down Expand Up @@ -433,20 +388,20 @@ operations.set('withTransaction', async ({ entities, operation, client }) => {

operations.set('countDocuments', async ({ entities, operation }) => {
const collection = entities.getEntity('collection', operation.object);
return collection.countDocuments(operation.arguments.filter as Document);
const { filter, ...opts } = operation.arguments;
return collection.countDocuments(filter, opts);
});

operations.set('deleteMany', async ({ entities, operation }) => {
const collection = entities.getEntity('collection', operation.object);
return collection.deleteMany(operation.arguments.filter);
const { filter, ...opts } = operation.arguments;
return collection.deleteMany(filter, opts);
});

operations.set('distinct', async ({ entities, operation }) => {
const collection = entities.getEntity('collection', operation.object);
return collection.distinct(
operation.arguments.fieldName as string,
operation.arguments.filter as Document
);
const { fieldName, filter, ...opts } = operation.arguments;
return collection.distinct(fieldName, filter, opts);
});

operations.set('estimatedDocumentCount', async ({ entities, operation }) => {
Expand All @@ -456,12 +411,14 @@ operations.set('estimatedDocumentCount', async ({ entities, operation }) => {

operations.set('findOneAndDelete', async ({ entities, operation }) => {
const collection = entities.getEntity('collection', operation.object);
return collection.findOneAndDelete(operation.arguments.filter);
const { filter, ...opts } = operation.arguments;
return collection.findOneAndDelete(filter, opts);
});

operations.set('runCommand', async ({ entities, operation }: OperationFunctionParams) => {
const db = entities.getEntity('db', operation.object);
return db.command(operation.arguments.command);
const { command, ...opts } = operation.arguments;
return db.command(command, opts);
});

operations.set('updateMany', async ({ entities, operation }) => {
Expand All @@ -484,6 +441,11 @@ export async function executeOperationAndCheck(
const opFunc = operations.get(operation.name);
expect(opFunc, `Unknown operation: ${operation.name}`).to.exist;

if (operation.arguments?.session) {
const session = entities.getEntity('session', operation.arguments.session, false);
operation.arguments.session = session;
}

let result;

try {
Expand Down

0 comments on commit 7aa3008

Please sign in to comment.