Skip to content

Commit

Permalink
feat(NODE-3083): support aggregate writes on secondaries (#3022)
Browse files Browse the repository at this point in the history
Co-authored-by: Daria Pardue <daria.pardue@mongodb.com>
  • Loading branch information
durran and dariakp committed Nov 5, 2021
1 parent d67eae0 commit f696909
Show file tree
Hide file tree
Showing 15 changed files with 528 additions and 15 deletions.
1 change: 1 addition & 0 deletions src/cmap/connection.ts
Expand Up @@ -97,6 +97,7 @@ export interface CommandOptions extends BSONSerializeOptions {
session?: ClientSession;
documentsReturnedIn?: string;
noResponse?: boolean;
omitReadPreference?: boolean;

// FIXME: NODE-2802
willRetryWrite?: boolean;
Expand Down
3 changes: 1 addition & 2 deletions src/operations/aggregate.ts
@@ -1,5 +1,4 @@
import { CommandOperation, CommandOperationOptions, CollationOptions } from './command';
import { ReadPreference } from '../read_preference';
import { MongoInvalidArgumentError } from '../error';
import { maxWireVersion, MongoDBNamespace } from '../utils';
import { Aspect, defineAspects, Hint } from './operation';
Expand Down Expand Up @@ -65,7 +64,7 @@ export class AggregateOperation<T = Document> extends CommandOperation<T> {
}

if (this.hasWriteStage) {
this.readPreference = ReadPreference.primary;
this.trySecondaryWrite = true;
}

if (this.explain && this.writeConcern) {
Expand Down
5 changes: 5 additions & 0 deletions src/operations/command.ts
Expand Up @@ -10,6 +10,7 @@ import type { Server } from '../sdam/server';
import type { BSONSerializeOptions, Document } from '../bson';
import type { ReadConcernLike } from './../read_concern';
import { Explain, ExplainOptions } from '../explain';
import { MIN_SECONDARY_WRITE_WIRE_VERSION } from '../sdam/server_selection';

const SUPPORTS_WRITE_CONCERN_AND_COLLATION = 5;

Expand Down Expand Up @@ -126,6 +127,10 @@ export abstract class CommandOperation<T> extends AbstractOperation<T> {
Object.assign(cmd, { readConcern: this.readConcern });
}

if (this.trySecondaryWrite && serverWireVersion < MIN_SECONDARY_WRITE_WIRE_VERSION) {
options.omitReadPreference = true;
}

if (options.collation && serverWireVersion < SUPPORTS_WRITE_CONCERN_AND_COLLATION) {
callback(
new MongoCompatibilityError(
Expand Down
15 changes: 13 additions & 2 deletions src/operations/execute_operation.ts
Expand Up @@ -17,6 +17,7 @@ import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import type { Document } from '../bson';
import { supportsRetryableWrites } from '../utils';
import { secondaryWritableServerSelector, ServerSelector } from '../sdam/server_selection';

const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
const MMAPv1_RETRY_WRITES_ERROR_MESSAGE =
Expand Down Expand Up @@ -150,6 +151,16 @@ function executeWithServerSelection(
session.unpin();
}

let selector: ReadPreference | ServerSelector;

// If operation should try to write to secondary use the custom server selector
// otherwise provide the read preference.
if (operation.trySecondaryWrite) {
selector = secondaryWritableServerSelector(topology.commonWireVersion, readPreference);
} else {
selector = readPreference;
}

const serverSelectionOptions = { session };
function callbackWithRetry(err?: any, result?: any) {
if (err == null) {
Expand Down Expand Up @@ -182,7 +193,7 @@ function executeWithServerSelection(
}

// select a new server, and attempt to retry the operation
topology.selectServer(readPreference, serverSelectionOptions, (e?: any, server?: any) => {
topology.selectServer(selector, serverSelectionOptions, (e?: any, server?: any) => {
if (
e ||
(operation.hasAspect(Aspect.READ_OPERATION) && !supportsRetryableReads(server)) ||
Expand Down Expand Up @@ -227,7 +238,7 @@ function executeWithServerSelection(
}

// select a server, and execute the operation against it
topology.selectServer(readPreference, serverSelectionOptions, (err?: any, server?: any) => {
topology.selectServer(selector, serverSelectionOptions, (err?: any, server?: any) => {
if (err) {
callback(err);
return;
Expand Down
3 changes: 3 additions & 0 deletions src/operations/operation.ts
Expand Up @@ -31,6 +31,7 @@ export interface OperationOptions extends BSONSerializeOptions {

/** @internal Hints to `executeOperation` that this operation should not unpin on an ended transaction */
bypassPinningCheck?: boolean;
omitReadPreference?: boolean;
}

/** @internal */
Expand All @@ -49,6 +50,7 @@ export abstract class AbstractOperation<TResult = any> {
readPreference: ReadPreference;
server!: Server;
bypassPinningCheck: boolean;
trySecondaryWrite: boolean;

// BSON serialization options
bsonOptions?: BSONSerializeOptions;
Expand All @@ -72,6 +74,7 @@ export abstract class AbstractOperation<TResult = any> {

this.options = options;
this.bypassPinningCheck = !!options.bypassPinningCheck;
this.trySecondaryWrite = false;
}

abstract execute(server: Server, session: ClientSession, callback: Callback<TResult>): void;
Expand Down
8 changes: 8 additions & 0 deletions src/sdam/server.ts
Expand Up @@ -299,6 +299,14 @@ export class Server extends TypedEventEmitter<ServerEvents> {
// Clone the options
const finalOptions = Object.assign({}, options, { wireProtocolCommand: false });

// There are cases where we need to flag the read preference not to get sent in
// the command, such as pre-5.0 servers attempting to perform an aggregate write
// with a non-primary read preference. In this case the effective read preference
// (primary) is not the same as the provided and must be removed completely.
if (finalOptions.omitReadPreference) {
delete finalOptions.readPreference;
}

// error if collation not supported
if (collationNotSupported(this, cmd)) {
callback(new MongoCompatibilityError(`Server ${this.name} does not support collation`));
Expand Down
25 changes: 25 additions & 0 deletions src/sdam/server_selection.ts
Expand Up @@ -8,6 +8,9 @@ import type { ServerDescription, TagSet } from './server_description';
const IDLE_WRITE_PERIOD = 10000;
const SMALLEST_MAX_STALENESS_SECONDS = 90;

// Minimum version to try writes on secondaries.
export const MIN_SECONDARY_WRITE_WIRE_VERSION = 13;

/** @public */
export type ServerSelector = (
topologyDescription: TopologyDescription,
Expand All @@ -28,6 +31,28 @@ export function writableServerSelector(): ServerSelector {
);
}

/**
* Returns a server selector that uses a read preference to select a
* server potentially for a write on a secondary.
*/
export function secondaryWritableServerSelector(
wireVersion?: number,
readPreference?: ReadPreference
): ServerSelector {
// If server version < 5.0, read preference always primary.
// If server version >= 5.0...
// - If read preference is supplied, use that.
// - If no read preference is supplied, use primary.
if (
!readPreference ||
!wireVersion ||
(wireVersion && wireVersion < MIN_SECONDARY_WRITE_WIRE_VERSION)
) {
return readPreferenceServerSelector(ReadPreference.primary);
}
return readPreferenceServerSelector(readPreference);
}

/**
* Reduces the passed in array of servers by the rules of the "Max Staleness" specification
* found here: https://github.com/mongodb/specifications/blob/master/source/max-staleness/max-staleness.rst
Expand Down
4 changes: 4 additions & 0 deletions src/sdam/topology.ts
Expand Up @@ -797,6 +797,10 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
return result;
}

get commonWireVersion(): number | undefined {
return this.description.commonWireVersion;
}

get logicalSessionTimeoutMinutes(): number | undefined {
return this.description.logicalSessionTimeoutMinutes;
}
Expand Down
6 changes: 1 addition & 5 deletions test/functional/crud_spec.test.js
Expand Up @@ -424,15 +424,11 @@ describe('CRUD spec v1', function () {
}
});

// TODO: Unskip when implementing NODE-3083.
const SKIP = ['aggregate-write-readPreference', 'db-aggregate-write-readPreference'];

describe('CRUD unified', function () {
for (const crudSpecTest of loadSpecTests('crud/unified')) {
expect(crudSpecTest).to.exist;
const testDescription = String(crudSpecTest.description);
const spec = SKIP.includes(testDescription) ? context.skip : context;
spec(testDescription, function () {
context(testDescription, function () {
for (const test of crudSpecTest.tests) {
it(String(test.description), {
metadata: { sessions: { skipLeakTests: true } },
Expand Down
8 changes: 5 additions & 3 deletions test/spec/crud/unified/aggregate-write-readPreference.json
@@ -1,6 +1,6 @@
{
"description": "aggregate-write-readPreference",
"schemaVersion": "1.3",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"minServerVersion": "3.6",
Expand Down Expand Up @@ -90,7 +90,8 @@
"description": "Aggregate with $out includes read preference for 5.0+ server",
"runOnRequirements": [
{
"minServerVersion": "5.0"
"minServerVersion": "5.0",
"serverless": "forbid"
}
],
"operations": [
Expand Down Expand Up @@ -181,7 +182,8 @@
"runOnRequirements": [
{
"minServerVersion": "4.2",
"maxServerVersion": "4.4.99"
"maxServerVersion": "4.4.99",
"serverless": "forbid"
}
],
"operations": [
Expand Down
4 changes: 3 additions & 1 deletion test/spec/crud/unified/aggregate-write-readPreference.yml
@@ -1,6 +1,6 @@
description: aggregate-write-readPreference

schemaVersion: '1.3'
schemaVersion: '1.4'

runOnRequirements:
# 3.6+ non-standalone is needed to utilize $readPreference in OP_MSG
Expand Down Expand Up @@ -59,6 +59,7 @@ tests:
- description: "Aggregate with $out includes read preference for 5.0+ server"
runOnRequirements:
- minServerVersion: "5.0"
serverless: "forbid"
operations:
- object: *collection0
name: aggregate
Expand Down Expand Up @@ -91,6 +92,7 @@ tests:
# drivers may avoid inheriting a client-level read concern for pre-4.2.
- minServerVersion: "4.2"
maxServerVersion: "4.4.99"
serverless: "forbid"
operations:
- object: *collection0
name: aggregate
Expand Down
6 changes: 4 additions & 2 deletions test/spec/crud/unified/db-aggregate-write-readPreference.json
Expand Up @@ -64,7 +64,8 @@
"description": "Database-level aggregate with $out includes read preference for 5.0+ server",
"runOnRequirements": [
{
"minServerVersion": "5.0"
"minServerVersion": "5.0",
"serverless": "forbid"
}
],
"operations": [
Expand Down Expand Up @@ -158,7 +159,8 @@
"runOnRequirements": [
{
"minServerVersion": "4.2",
"maxServerVersion": "4.4.99"
"maxServerVersion": "4.4.99",
"serverless": "forbid"
}
],
"operations": [
Expand Down
2 changes: 2 additions & 0 deletions test/spec/crud/unified/db-aggregate-write-readPreference.yml
Expand Up @@ -52,6 +52,7 @@ tests:
- description: "Database-level aggregate with $out includes read preference for 5.0+ server"
runOnRequirements:
- minServerVersion: "5.0"
serverless: "forbid"
operations:
- object: *database0
name: aggregate
Expand Down Expand Up @@ -85,6 +86,7 @@ tests:
# drivers may avoid inheriting a client-level read concern for pre-4.2.
- minServerVersion: "4.2"
maxServerVersion: "4.4.99"
serverless: "forbid"
operations:
- object: *database0
name: aggregate
Expand Down
72 changes: 72 additions & 0 deletions test/unit/operations/aggregate.test.js
@@ -0,0 +1,72 @@
'use strict';

const { expect } = require('chai');
const { AggregateOperation } = require('../../../src/operations/aggregate');

describe('AggregateOperation', function () {
const db = 'test';

describe('#constructor', function () {
context('when out is in the options', function () {
const operation = new AggregateOperation(db, [], { out: 'test', dbName: db });

it('sets trySecondaryWrite to true', function () {
expect(operation.trySecondaryWrite).to.be.true;
});
});

context('when $out is the last stage', function () {
const operation = new AggregateOperation(db, [{ $out: 'test' }], { dbName: db });

it('sets trySecondaryWrite to true', function () {
expect(operation.trySecondaryWrite).to.be.true;
});
});

context('when $out is not the last stage', function () {
const operation = new AggregateOperation(db, [{ $out: 'test' }, { $project: { name: 1 } }], {
dbName: db
});

it('sets trySecondaryWrite to false', function () {
expect(operation.trySecondaryWrite).to.be.false;
});
});

context('when $merge is the last stage', function () {
const operation = new AggregateOperation(db, [{ $merge: { into: 'test' } }], { dbName: db });

it('sets trySecondaryWrite to true', function () {
expect(operation.trySecondaryWrite).to.be.true;
});
});

context('when $merge is not the last stage', function () {
const operation = new AggregateOperation(
db,
[{ $merge: { into: 'test' } }, { $project: { name: 1 } }],
{ dbName: db }
);

it('sets trySecondaryWrite to false', function () {
expect(operation.trySecondaryWrite).to.be.false;
});
});

context('when no writable stages in empty pipeline', function () {
const operation = new AggregateOperation(db, [], { dbName: db });

it('sets trySecondaryWrite to false', function () {
expect(operation.trySecondaryWrite).to.be.false;
});
});

context('when no writable stages', function () {
const operation = new AggregateOperation(db, [{ $project: { name: 1 } }], { dbName: db });

it('sets trySecondaryWrite to false', function () {
expect(operation.trySecondaryWrite).to.be.false;
});
});
});
});

0 comments on commit f696909

Please sign in to comment.