Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-3392): enable snapshot reads on secondaries #2897

Merged
merged 16 commits into from Jul 12, 2021
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/operations/execute_operation.ts
Expand Up @@ -86,6 +86,8 @@ export function executeOperation<
session = topology.startSession({ owner, explicit: false });
} else if (session.hasEnded) {
return cb(new MongoDriverError('Use of expired sessions is not permitted'));
} else if (session.snapshotEnabled && !topology.capabilities.supportsSnapshotReads) {
return cb(new MongoDriverError('Snapshot reads require MongoDB 5.0 or later'));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to use a more specific error here now that they are available?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compat and InvalidArgument are still not available so I think leaving the DriverError in there would make sense for now.

}
} else if (session) {
// If the user passed an explicit session and we are still, after server selection,
Expand Down
6 changes: 5 additions & 1 deletion src/sdam/topology.ts
Expand Up @@ -379,7 +379,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
return this.s.description;
}

capabilities(): ServerCapabilities {
get capabilities(): ServerCapabilities {
return new ServerCapabilities(this.lastIsMaster());
}

Expand Down Expand Up @@ -1064,6 +1064,10 @@ export class ServerCapabilities {
return this.maxWireVersion >= 3;
}

get supportsSnapshotReads(): boolean {
return this.maxWireVersion >= 13;
}

get commandsTakeWriteConcern(): boolean {
return this.maxWireVersion >= 5;
}
Expand Down
78 changes: 55 additions & 23 deletions src/sessions.ts
Expand Up @@ -30,6 +30,7 @@ import type { AbstractCursor } from './cursor/abstract_cursor';
import type { CommandOptions } from './cmap/connection';
import type { WriteConcern } from './write_concern';
import { TypedEventEmitter } from './mongo_types';
import { ReadConcernLevel } from './read_concern';

const minWireVersionForShardedTransactions = 8;

Expand All @@ -51,6 +52,8 @@ function assertAlive(session: ClientSession, callback?: Callback): boolean {
export interface ClientSessionOptions {
/** Whether causal consistency should be enabled on this session */
causalConsistency?: boolean;
/** Whether all read operations should be read from the same snapshot for this session (NOTE: not compatible with `causalConsistency=true`) */
snapshot?: boolean;
/** The default TransactionOptions to use for transactions started on this session. */
defaultTransactionOptions?: TransactionOptions;

Expand All @@ -72,14 +75,18 @@ export type ClientSessionEvents = {

/** @internal */
const kServerSession = Symbol('serverSession');
/** @internal */
const kSnapshotTime = Symbol('snapshotTime');
/** @internal */
const kSnapshotEnabled = Symbol('snapshotEnabled');

/**
* A class representing a client session on the server
*
* NOTE: not meant to be instantiated directly.
* @public
*/
class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
/** @internal */
topology: Topology;
/** @internal */
Expand All @@ -96,6 +103,14 @@ class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
transaction: Transaction;
/** @internal */
[kServerSession]?: ServerSession;
/** @internal */
[kSnapshotTime]?: Timestamp;
/** @internal */
[kSnapshotEnabled] = false;

get snapshotEnabled(): boolean {
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
return this[kSnapshotEnabled];
}

/**
* Create a client session.
Expand Down Expand Up @@ -123,15 +138,23 @@ class ClientSession extends TypedEventEmitter<ClientSessionEvents> {

options = options ?? {};

if (options.snapshot === true) {
this[kSnapshotEnabled] = true;
if (options.causalConsistency === true) {
throw new MongoDriverError(
Copy link
Contributor Author

@dariakp dariakp Jul 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to use a more specific error here now that they are available?

'Properties "causalConsistency" and "snapshot" are mutually exclusive'
);
}
}

this.topology = topology;
this.sessionPool = sessionPool;
this.hasEnded = false;
this.clientOptions = clientOptions;
this[kServerSession] = undefined;

this.supports = {
causalConsistency:
typeof options.causalConsistency === 'boolean' ? options.causalConsistency : true
causalConsistency: options.snapshot !== true && options.causalConsistency !== false
};

this.clusterTime = options.initialClusterTime;
Expand Down Expand Up @@ -257,6 +280,10 @@ class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
* @param options - Options for the transaction
*/
startTransaction(options?: TransactionOptions): void {
if (this[kSnapshotEnabled]) {
throw new MongoDriverError('Transactions are not allowed with snapshot sessions');
Copy link
Contributor Author

@dariakp dariakp Jul 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to use a more specific error here now that they are available?

}

assertAlive(this);
if (this.inTransaction()) {
throw new MongoDriverError('Transaction already in progress');
Expand Down Expand Up @@ -623,7 +650,7 @@ export type ServerSessionId = { id: Binary };
* WARNING: not meant to be instantiated directly. For internal use only.
* @public
*/
class ServerSession {
export class ServerSession {
id: ServerSessionId;
lastUse: number;
txnNumber: number;
Expand Down Expand Up @@ -658,7 +685,7 @@ class ServerSession {
* For internal use only
* @internal
*/
class ServerSessionPool {
export class ServerSessionPool {
topology: Topology;
sessions: ServerSession[];

Expand Down Expand Up @@ -746,7 +773,7 @@ class ServerSessionPool {

// TODO: this should be codified in command construction
// @see https://github.com/mongodb/specifications/blob/master/source/read-write-concern/read-write-concern.rst#read-concern
function commandSupportsReadConcern(command: Document, options?: Document): boolean {
export function commandSupportsReadConcern(command: Document, options?: Document): boolean {
if (command.aggregate || command.count || command.distinct || command.find || command.geoNear) {
return true;
}
Expand All @@ -770,7 +797,7 @@ function commandSupportsReadConcern(command: Document, options?: Document): bool
* @param command - the command to decorate
* @param options - Optional settings passed to calling operation
*/
function applySession(
export function applySession(
session: ClientSession,
command: Document,
options?: CommandOptions
Expand Down Expand Up @@ -801,28 +828,35 @@ function applySession(
// first apply non-transaction-specific sessions data
const inTransaction = session.inTransaction() || isTransactionCommand(command);
const isRetryableWrite = options?.willRetryWrite || false;
const shouldApplyReadConcern = commandSupportsReadConcern(command, options);

if (serverSession.txnNumber && (isRetryableWrite || inTransaction)) {
command.txnNumber = Long.fromNumber(serverSession.txnNumber);
}

// now attempt to apply transaction-specific sessions data
if (!inTransaction) {
if (session.transaction.state !== TxnState.NO_TRANSACTION) {
session.transaction.transition(TxnState.NO_TRANSACTION);
}

// TODO: the following should only be applied to read operation per spec.
// for causal consistency
if (session.supports.causalConsistency && session.operationTime && shouldApplyReadConcern) {
if (
session.supports.causalConsistency &&
session.operationTime &&
commandSupportsReadConcern(command, options)
) {
command.readConcern = command.readConcern || {};
Object.assign(command.readConcern, { afterClusterTime: session.operationTime });
} else if (session[kSnapshotEnabled]) {
command.readConcern = command.readConcern || { level: ReadConcernLevel.snapshot };
if (session[kSnapshotTime] !== undefined) {
Object.assign(command.readConcern, { atClusterTime: session[kSnapshotTime] });
}
}

return;
}

// now attempt to apply transaction-specific sessions data

// `autocommit` must always be false to differentiate from retryable writes
command.autocommit = false;

Expand All @@ -843,7 +877,7 @@ function applySession(
}
}

function updateSessionFromResponse(session: ClientSession, document: Document): void {
export function updateSessionFromResponse(session: ClientSession, document: Document): void {
if (document.$clusterTime) {
resolveClusterTime(session, document.$clusterTime);
}
Expand All @@ -855,14 +889,12 @@ function updateSessionFromResponse(session: ClientSession, document: Document):
if (document.recoveryToken && session && session.inTransaction()) {
session.transaction._recoveryToken = document.recoveryToken;
}
}

export {
ClientSession,
ServerSession,
ServerSessionPool,
TxnState,
applySession,
updateSessionFromResponse,
commandSupportsReadConcern
};
if (
document.cursor?.atClusterTime &&
session?.[kSnapshotEnabled] &&
session[kSnapshotTime] === undefined
) {
session[kSnapshotTime] = document.cursor.atClusterTime;
}
}
2 changes: 1 addition & 1 deletion src/utils.ts
Expand Up @@ -394,7 +394,7 @@ export function decorateWithCollation(
target: MongoClient | Db | Collection,
options: AnyOptions
): void {
const capabilities = getTopology(target).capabilities();
const capabilities = getTopology(target).capabilities;
if (options.collation && typeof options.collation === 'object') {
if (capabilities && capabilities.commandsTakeCollation) {
command.collation = options.collation;
Expand Down
51 changes: 44 additions & 7 deletions test/functional/sessions.test.js
@@ -1,11 +1,11 @@
'use strict';

const path = require('path');
const expect = require('chai').expect;
const setupDatabase = require('./shared').setupDatabase;
const withMonitoredClient = require('./shared').withMonitoredClient;
const TestRunnerContext = require('./spec-runner').TestRunnerContext;
const generateTopologyTests = require('./spec-runner').generateTopologyTests;
const loadSpecTests = require('../spec').loadSpecTests;
const { setupDatabase, withMonitoredClient } = require('./shared');
const { TestRunnerContext, generateTopologyTests } = require('./spec-runner');
const { loadSpecTests } = require('../spec');
const { runUnifiedTest } = require('./unified-spec-runner/runner');

const ignoredCommands = ['ismaster'];
const test = {
Expand Down Expand Up @@ -148,7 +148,7 @@ describe('Sessions - functional', function () {
}
});

describe('spec tests', function () {
describe('legacy spec tests', function () {
class SessionSpecTestContext extends TestRunnerContext {
assertSessionNotDirty(options) {
const session = options.session;
Expand Down Expand Up @@ -176,7 +176,7 @@ describe('Sessions - functional', function () {
}

const testContext = new SessionSpecTestContext();
const testSuites = loadSpecTests('sessions');
const testSuites = loadSpecTests(path.join('sessions', 'legacy'));

after(() => testContext.teardown());
before(function () {
Expand All @@ -196,6 +196,43 @@ describe('Sessions - functional', function () {
generateTopologyTests(testSuites, testContext, testFilter);
});

describe('unified spec tests', function () {
for (const sessionTests of loadSpecTests(path.join('sessions', 'unified'))) {
expect(sessionTests).to.be.an('object');
context(String(sessionTests.description), function () {
// TODO: NODE-3393 fix test runner to apply session to all operations
const skipTestMap = {
'snapshot-sessions': [
'countDocuments operation with snapshot',
'Distinct operation with snapshot',
'Mixed operation with snapshot'
],
'snapshot-sessions-not-supported-client-error': [
'Client error on distinct with snapshot'
],
'snapshot-sessions-not-supported-server-error': [
'Server returns an error on distinct with snapshot'
],
'snapshot-sessions-unsupported-ops': [
'Server returns an error on listCollections with snapshot',
'Server returns an error on listDatabases with snapshot',
'Server returns an error on listIndexes with snapshot',
'Server returns an error on runCommand with snapshot'
]
};
const testsToSkip = skipTestMap[sessionTests.description] || [];
for (const test of sessionTests.tests) {
it(String(test.description), {
metadata: { sessions: { skipLeakTests: true } },
test: async function () {
await runUnifiedTest(this, sessionTests, test, testsToSkip);
}
});
}
});
}
});

context('unacknowledged writes', () => {
it('should not include session for unacknowledged writes', {
metadata: { requires: { topology: 'single', mongodb: '>=3.6.0' } },
Expand Down
4 changes: 4 additions & 0 deletions test/functional/unified-spec-runner/entities.ts
Expand Up @@ -250,6 +250,10 @@ export class EntitiesMap<E = Entity> extends Map<string, E> {
options.causalConsistency = entity.session.sessionOptions?.causalConsistency;
}

if (entity.session.sessionOptions?.snapshot) {
options.snapshot = entity.session.sessionOptions.snapshot;
}

if (entity.session.sessionOptions?.defaultTransactionOptions) {
options.defaultTransactionOptions = Object.create(null);
const defaultOptions = entity.session.sessionOptions.defaultTransactionOptions;
Expand Down
6 changes: 3 additions & 3 deletions test/spec/read-write-concern/README.rst
@@ -1,6 +1,6 @@
=======================
Connection String Tests
=======================
============================
Read and Write Concern Tests
============================

The YAML and JSON files in this directory tree are platform-independent tests
that drivers can use to prove their conformance to the Read and Write Concern
Expand Down
26 changes: 23 additions & 3 deletions test/spec/sessions/README.rst
Expand Up @@ -9,10 +9,11 @@ Driver Session Tests
Introduction
============

The YAML and JSON files in this directory are platform-independent tests that
drivers can use to prove their conformance to the Driver Sessions Spec. They are
The YAML and JSON files in the ``legacy`` and ``unified`` sub-directories are platform-independent tests
that drivers can use to prove their conformance to the Driver Sessions Spec. They are
designed with the intention of sharing most test-runner code with the
Transactions spec tests.
`Transactions Spec tests <../../transactions/tests/README.rst#test-format>`_.. Tests in the
``unified`` directory are written using the `Unified Test Format <../../unified-test-format/unified-test-format.rst>`_.

Several prose tests, which are not easily expressed in YAML, are also presented
in the Driver Sessions Spec. Those tests will need to be manually implemented
Expand Down Expand Up @@ -78,7 +79,26 @@ the given session is *not* marked dirty::
arguments:
session: session0

Snapshot session tests
======================
Snapshot sessions tests require server of version 5.0 or higher and
replica set or a sharded cluster deployment.
Default snapshot history window on the server is 5 minutes. Running the test in debug mode, or in any other slow configuration
may lead to `SnapshotTooOld` errors. Drivers can work around this issue by increasing the server's `minSnapshotHistoryWindowInSeconds` parameter, for example:

.. code:: python

client.admin.command('setParameter', 1, minSnapshotHistoryWindowInSeconds=60)

Prose tests
```````````
- Setting both ``snapshot`` and ``causalConsistency`` is not allowed

* ``client.startSession(snapshot = true, causalConsistency = true)``
* Assert that an error was raised by driver

Changelog
=========

:2019-05-15: Initial version.
:2021-06-15: Added snapshot-session tests. Introduced legacy and unified folders.