From 9e05730dcd12db124d8fccce7b052c6d27c47271 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Tue, 2 Nov 2021 13:40:26 +0100 Subject: [PATCH 1/7] feat(NODE-3083): support aggregate writes on secondaries --- src/operations/aggregate.ts | 3 +- src/operations/execute_operation.ts | 13 +++- src/operations/operation.ts | 1 + src/sdam/server_selection.ts | 21 ++++++ src/sdam/topology.ts | 4 ++ test/functional/crud_spec.test.js | 6 +- test/unit/operations/aggregate.test.js | 42 +++++++++++ test/unit/sdam/server_selection.test.js | 94 +++++++++++++++++++++++++ 8 files changed, 176 insertions(+), 8 deletions(-) create mode 100644 test/unit/operations/aggregate.test.js create mode 100644 test/unit/sdam/server_selection.test.js diff --git a/src/operations/aggregate.ts b/src/operations/aggregate.ts index 0685400670..ca899a896b 100644 --- a/src/operations/aggregate.ts +++ b/src/operations/aggregate.ts @@ -1,5 +1,4 @@ import { CommandOperation, CommandOperationOptions, CollationOptions } from './command'; -import { ReadPreference } from '../read_preference'; import { MongoInvalidArgumentError } from '../error'; import { maxWireVersion, MongoDBNamespace } from '../utils'; import { Aspect, defineAspects, Hint } from './operation'; @@ -65,7 +64,7 @@ export class AggregateOperation extends CommandOperation { } if (this.hasWriteStage) { - this.readPreference = ReadPreference.primary; + this.trySecondaryWrite = true; } if (this.explain && this.writeConcern) { diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index 780978324d..0bd2dc20b6 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -17,6 +17,7 @@ import type { Topology } from '../sdam/topology'; import type { ClientSession } from '../sessions'; import type { Document } from '../bson'; import { supportsRetryableWrites } from '../utils'; +import { secondaryWritableServerSelector } from '../sdam/server_selection'; const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation; const MMAPv1_RETRY_WRITES_ERROR_MESSAGE = @@ -181,8 +182,18 @@ function executeWithServerSelection( return; } + let selector; + + // If operation should try to write to secondary use the custom server selector + // otherwise provide the read preference. + if (operation.trySecondaryWrite) { + selector = secondaryWritableServerSelector(topology.commonWireVersion, readPreference); + } else { + selector = readPreference; + } + // select a new server, and attempt to retry the operation - topology.selectServer(readPreference, serverSelectionOptions, (e?: any, server?: any) => { + topology.selectServer(selector, serverSelectionOptions, (e?: any, server?: any) => { if ( e || (operation.hasAspect(Aspect.READ_OPERATION) && !supportsRetryableReads(server)) || diff --git a/src/operations/operation.ts b/src/operations/operation.ts index c503b8180c..1142b6cf9e 100644 --- a/src/operations/operation.ts +++ b/src/operations/operation.ts @@ -49,6 +49,7 @@ export abstract class AbstractOperation { readPreference: ReadPreference; server!: Server; bypassPinningCheck: boolean; + trySecondaryWrite = false; // BSON serialization options bsonOptions?: BSONSerializeOptions; diff --git a/src/sdam/server_selection.ts b/src/sdam/server_selection.ts index 390da9693d..444d83ecae 100644 --- a/src/sdam/server_selection.ts +++ b/src/sdam/server_selection.ts @@ -8,6 +8,9 @@ import type { ServerDescription, TagSet } from './server_description'; const IDLE_WRITE_PERIOD = 10000; const SMALLEST_MAX_STALENESS_SECONDS = 90; +// Minimum version to try writes on secondaries. +export const MIN_SECONDARY_WRITE_WIRE_VERSION = 13; + /** @public */ export type ServerSelector = ( topologyDescription: TopologyDescription, @@ -28,6 +31,24 @@ export function writableServerSelector(): ServerSelector { ); } +/** + * Returns a server selector that uses a read preference to select a + * server potentially for a write on a secondary. + */ +export function secondaryWritableServerSelector( + wireVersion?: number, + readPreference?: ReadPreference +): ServerSelector { + // If server version < 5.0, read preference always primary. + // If server version >= 5.0... + // - If read preference is supplied, use that. + // - If no read preference is supplied, use primary. + if (!readPreference || (wireVersion && wireVersion < MIN_SECONDARY_WRITE_WIRE_VERSION)) { + return readPreferenceServerSelector(ReadPreference.primary); + } + return readPreferenceServerSelector(readPreference); +} + /** * Reduces the passed in array of servers by the rules of the "Max Staleness" specification * found here: https://github.com/mongodb/specifications/blob/master/source/max-staleness/max-staleness.rst diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index 67f81db609..fa2551faa5 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -797,6 +797,10 @@ export class Topology extends TypedEventEmitter { return result; } + get commonWireVersion(): number | undefined { + return this.description.commonWireVersion; + } + get logicalSessionTimeoutMinutes(): number | undefined { return this.description.logicalSessionTimeoutMinutes; } diff --git a/test/functional/crud_spec.test.js b/test/functional/crud_spec.test.js index c3e4b41bfe..4e2b8e2782 100644 --- a/test/functional/crud_spec.test.js +++ b/test/functional/crud_spec.test.js @@ -424,15 +424,11 @@ describe('CRUD spec v1', function () { } }); -// TODO: Unskip when implementing NODE-3083. -const SKIP = ['aggregate-write-readPreference', 'db-aggregate-write-readPreference']; - describe('CRUD unified', function () { for (const crudSpecTest of loadSpecTests('crud/unified')) { expect(crudSpecTest).to.exist; const testDescription = String(crudSpecTest.description); - const spec = SKIP.includes(testDescription) ? context.skip : context; - spec(testDescription, function () { + context(testDescription, function () { for (const test of crudSpecTest.tests) { it(String(test.description), { metadata: { sessions: { skipLeakTests: true } }, diff --git a/test/unit/operations/aggregate.test.js b/test/unit/operations/aggregate.test.js new file mode 100644 index 0000000000..74b55f745d --- /dev/null +++ b/test/unit/operations/aggregate.test.js @@ -0,0 +1,42 @@ +'use strict'; + +const { expect } = require('chai'); +const { AggregateOperation } = require('../../../src/operations/aggregate'); + +describe('AggregateOperation', function () { + const db = 'test'; + + describe('#constructor', function () { + context('when out is in the options', function () { + const operation = new AggregateOperation(db, [], { out: 'test', dbName: db }); + + it('sets trySecondaryWrite to true', function () { + expect(operation.trySecondaryWrite).to.be.true; + }); + }); + + context('when $out is the last stage', function () { + const operation = new AggregateOperation(db, [{ $out: 'test' }], { dbName: db }); + + it('sets trySecondaryWrite to true', function () { + expect(operation.trySecondaryWrite).to.be.true; + }); + }); + + context('when $merge is the last stage', function () { + const operation = new AggregateOperation(db, [{ $merge: { into: 'test' } }], { dbName: db }); + + it('sets trySecondaryWrite to true', function () { + expect(operation.trySecondaryWrite).to.be.true; + }); + }); + + context('when no writable stages', function () { + const operation = new AggregateOperation(db, [], { dbName: db }); + + it('sets trySecondaryWrite to false', function () { + expect(operation.trySecondaryWrite).to.be.false; + }); + }); + }); +}); diff --git a/test/unit/sdam/server_selection.test.js b/test/unit/sdam/server_selection.test.js new file mode 100644 index 0000000000..c64b301216 --- /dev/null +++ b/test/unit/sdam/server_selection.test.js @@ -0,0 +1,94 @@ +'use strict'; + +const { expect } = require('chai'); +const { ObjectId } = require('../../../src/bson'); +const { ReadPreference } = require('../../../src/read_preference'); +const { + secondaryWritableServerSelector, + MIN_SECONDARY_WRITE_WIRE_VERSION +} = require('../../../src/sdam/server_selection'); +const { ServerDescription } = require('../../../src/sdam/server_description'); +const { TopologyDescription } = require('../../../src/sdam/topology_description'); +const { TopologyType } = require('../../../src/sdam/common'); + +describe('ServerSelector', function () { + describe('#secondaryWritableServerSelector', function () { + const primary = new ServerDescription('127.0.0.1:27017', { + setName: 'test', + isWritablePrimary: true, + ok: 1 + }); + const secondary = new ServerDescription('127.0.0.1:27018', { + setName: 'test', + secondary: true, + ok: 1 + }); + const serverDescriptions = new Map(); + serverDescriptions.set('127.0.0.1:27017', primary); + serverDescriptions.set('127.0.0.1:27018', secondary); + + context('when the common server version is >= 5.0', function () { + const topologyDescription = new TopologyDescription( + TopologyType.ReplicaSetWithPrimary, + serverDescriptions, + 'test', + MIN_SECONDARY_WRITE_WIRE_VERSION, + new ObjectId(), + MIN_SECONDARY_WRITE_WIRE_VERSION + ); + + context('when a read preference is provided', function () { + const selector = secondaryWritableServerSelector( + MIN_SECONDARY_WRITE_WIRE_VERSION, + ReadPreference.secondary + ); + const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + + it('uses the provided read preference', function () { + expect(server).to.deep.equal([secondary]); + }); + }); + + context('when a read preference is not provided', function () { + const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION); + const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + + it('selects a primary', function () { + expect(server).to.deep.equal([primary]); + }); + }); + }); + + context('when the common server version is < 5.0', function () { + const topologyDescription = new TopologyDescription( + TopologyType.ReplicaSetWithPrimary, + serverDescriptions, + 'test', + MIN_SECONDARY_WRITE_WIRE_VERSION - 1, + new ObjectId(), + MIN_SECONDARY_WRITE_WIRE_VERSION - 1 + ); + + context('when a read preference is provided', function () { + const selector = secondaryWritableServerSelector( + MIN_SECONDARY_WRITE_WIRE_VERSION - 1, + ReadPreference.secondary + ); + const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + + it('selects a primary', function () { + expect(server).to.deep.equal([primary]); + }); + }); + + context('when read preference is not provided', function () { + const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION - 1); + const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + + it('selects a primary', function () { + expect(server).to.deep.equal([primary]); + }); + }); + }); + }); +}); From 960cfe601ae2e33763bc50f8393a95025385f3c3 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Tue, 2 Nov 2021 14:06:29 +0100 Subject: [PATCH 2/7] test: add test for no wire version provided --- test/unit/sdam/server_selection.test.js | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/test/unit/sdam/server_selection.test.js b/test/unit/sdam/server_selection.test.js index c64b301216..6e6c799ca3 100644 --- a/test/unit/sdam/server_selection.test.js +++ b/test/unit/sdam/server_selection.test.js @@ -90,5 +90,22 @@ describe('ServerSelector', function () { }); }); }); + + context('when a common wire version is not provided', function () { + const topologyDescription = new TopologyDescription( + TopologyType.ReplicaSetWithPrimary, + serverDescriptions, + 'test', + MIN_SECONDARY_WRITE_WIRE_VERSION, + new ObjectId(), + MIN_SECONDARY_WRITE_WIRE_VERSION + ); + const selector = secondaryWritableServerSelector(); + const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + + it('selects a primary', function () { + expect(server).to.deep.equal([primary]); + }); + }); }); }); From 4e2d1687b03b64c12bff56d1370951a8d03d16ee Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Tue, 2 Nov 2021 14:35:16 +0100 Subject: [PATCH 3/7] test(NODE-3684): skip tests on serverless --- src/operations/execute_operation.ts | 24 +++++++++---------- src/sdam/server_selection.ts | 2 ++ .../aggregate-write-readPreference.json | 8 ++++--- .../aggregate-write-readPreference.yml | 4 +++- .../db-aggregate-write-readPreference.json | 6 +++-- .../db-aggregate-write-readPreference.yml | 2 ++ 6 files changed, 28 insertions(+), 18 deletions(-) diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index 0bd2dc20b6..4ac0a368e9 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -17,7 +17,7 @@ import type { Topology } from '../sdam/topology'; import type { ClientSession } from '../sessions'; import type { Document } from '../bson'; import { supportsRetryableWrites } from '../utils'; -import { secondaryWritableServerSelector } from '../sdam/server_selection'; +import { secondaryWritableServerSelector, ServerSelector } from '../sdam/server_selection'; const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation; const MMAPv1_RETRY_WRITES_ERROR_MESSAGE = @@ -151,6 +151,16 @@ function executeWithServerSelection( session.unpin(); } + let selector: ReadPreference | ServerSelector; + + // If operation should try to write to secondary use the custom server selector + // otherwise provide the read preference. + if (operation.trySecondaryWrite) { + selector = secondaryWritableServerSelector(topology.commonWireVersion, readPreference); + } else { + selector = readPreference; + } + const serverSelectionOptions = { session }; function callbackWithRetry(err?: any, result?: any) { if (err == null) { @@ -182,16 +192,6 @@ function executeWithServerSelection( return; } - let selector; - - // If operation should try to write to secondary use the custom server selector - // otherwise provide the read preference. - if (operation.trySecondaryWrite) { - selector = secondaryWritableServerSelector(topology.commonWireVersion, readPreference); - } else { - selector = readPreference; - } - // select a new server, and attempt to retry the operation topology.selectServer(selector, serverSelectionOptions, (e?: any, server?: any) => { if ( @@ -238,7 +238,7 @@ function executeWithServerSelection( } // select a server, and execute the operation against it - topology.selectServer(readPreference, serverSelectionOptions, (err?: any, server?: any) => { + topology.selectServer(selector, serverSelectionOptions, (err?: any, server?: any) => { if (err) { callback(err); return; diff --git a/src/sdam/server_selection.ts b/src/sdam/server_selection.ts index 444d83ecae..95010d66f9 100644 --- a/src/sdam/server_selection.ts +++ b/src/sdam/server_selection.ts @@ -43,6 +43,8 @@ export function secondaryWritableServerSelector( // If server version >= 5.0... // - If read preference is supplied, use that. // - If no read preference is supplied, use primary. + /* eslint no-console: 0 */ + console.log('select', readPreference, wireVersion); if (!readPreference || (wireVersion && wireVersion < MIN_SECONDARY_WRITE_WIRE_VERSION)) { return readPreferenceServerSelector(ReadPreference.primary); } diff --git a/test/spec/crud/unified/aggregate-write-readPreference.json b/test/spec/crud/unified/aggregate-write-readPreference.json index 44680fb1ff..bc887e83cb 100644 --- a/test/spec/crud/unified/aggregate-write-readPreference.json +++ b/test/spec/crud/unified/aggregate-write-readPreference.json @@ -1,6 +1,6 @@ { "description": "aggregate-write-readPreference", - "schemaVersion": "1.3", + "schemaVersion": "1.4", "runOnRequirements": [ { "minServerVersion": "3.6", @@ -90,7 +90,8 @@ "description": "Aggregate with $out includes read preference for 5.0+ server", "runOnRequirements": [ { - "minServerVersion": "5.0" + "minServerVersion": "5.0", + "serverless": "forbid" } ], "operations": [ @@ -181,7 +182,8 @@ "runOnRequirements": [ { "minServerVersion": "4.2", - "maxServerVersion": "4.4.99" + "maxServerVersion": "4.4.99", + "serverless": "forbid" } ], "operations": [ diff --git a/test/spec/crud/unified/aggregate-write-readPreference.yml b/test/spec/crud/unified/aggregate-write-readPreference.yml index 424713161a..86f5a4399c 100644 --- a/test/spec/crud/unified/aggregate-write-readPreference.yml +++ b/test/spec/crud/unified/aggregate-write-readPreference.yml @@ -1,6 +1,6 @@ description: aggregate-write-readPreference -schemaVersion: '1.3' +schemaVersion: '1.4' runOnRequirements: # 3.6+ non-standalone is needed to utilize $readPreference in OP_MSG @@ -59,6 +59,7 @@ tests: - description: "Aggregate with $out includes read preference for 5.0+ server" runOnRequirements: - minServerVersion: "5.0" + serverless: "forbid" operations: - object: *collection0 name: aggregate @@ -91,6 +92,7 @@ tests: # drivers may avoid inheriting a client-level read concern for pre-4.2. - minServerVersion: "4.2" maxServerVersion: "4.4.99" + serverless: "forbid" operations: - object: *collection0 name: aggregate diff --git a/test/spec/crud/unified/db-aggregate-write-readPreference.json b/test/spec/crud/unified/db-aggregate-write-readPreference.json index 2a20542390..2a81282de8 100644 --- a/test/spec/crud/unified/db-aggregate-write-readPreference.json +++ b/test/spec/crud/unified/db-aggregate-write-readPreference.json @@ -64,7 +64,8 @@ "description": "Database-level aggregate with $out includes read preference for 5.0+ server", "runOnRequirements": [ { - "minServerVersion": "5.0" + "minServerVersion": "5.0", + "serverless": "forbid" } ], "operations": [ @@ -158,7 +159,8 @@ "runOnRequirements": [ { "minServerVersion": "4.2", - "maxServerVersion": "4.4.99" + "maxServerVersion": "4.4.99", + "serverless": "forbid" } ], "operations": [ diff --git a/test/spec/crud/unified/db-aggregate-write-readPreference.yml b/test/spec/crud/unified/db-aggregate-write-readPreference.yml index a79eb0650a..04a3b2169f 100644 --- a/test/spec/crud/unified/db-aggregate-write-readPreference.yml +++ b/test/spec/crud/unified/db-aggregate-write-readPreference.yml @@ -52,6 +52,7 @@ tests: - description: "Database-level aggregate with $out includes read preference for 5.0+ server" runOnRequirements: - minServerVersion: "5.0" + serverless: "forbid" operations: - object: *database0 name: aggregate @@ -85,6 +86,7 @@ tests: # drivers may avoid inheriting a client-level read concern for pre-4.2. - minServerVersion: "4.2" maxServerVersion: "4.4.99" + serverless: "forbid" operations: - object: *database0 name: aggregate From 6529f20dd9a2cffdbc4ba0844e0a98c84ab1933c Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Wed, 3 Nov 2021 09:41:41 +0100 Subject: [PATCH 4/7] test: remove read preference pre 5.0 --- src/cmap/connection.ts | 1 + src/operations/command.ts | 5 + src/operations/operation.ts | 4 +- src/sdam/server.ts | 8 + src/sdam/server_selection.ts | 2 - test/unit/sdam/server_selection.test.js | 402 ++++++++++++++++++++---- 6 files changed, 353 insertions(+), 69 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index e7b6e52d22..ccee670fc0 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -97,6 +97,7 @@ export interface CommandOptions extends BSONSerializeOptions { session?: ClientSession; documentsReturnedIn?: string; noResponse?: boolean; + omitReadPreference?: boolean; // FIXME: NODE-2802 willRetryWrite?: boolean; diff --git a/src/operations/command.ts b/src/operations/command.ts index 70ae413d65..0d5ae4e1b0 100644 --- a/src/operations/command.ts +++ b/src/operations/command.ts @@ -10,6 +10,7 @@ import type { Server } from '../sdam/server'; import type { BSONSerializeOptions, Document } from '../bson'; import type { ReadConcernLike } from './../read_concern'; import { Explain, ExplainOptions } from '../explain'; +import { MIN_SECONDARY_WRITE_WIRE_VERSION } from '../sdam/server_selection'; const SUPPORTS_WRITE_CONCERN_AND_COLLATION = 5; @@ -126,6 +127,10 @@ export abstract class CommandOperation extends AbstractOperation { Object.assign(cmd, { readConcern: this.readConcern }); } + if (this.trySecondaryWrite && serverWireVersion < MIN_SECONDARY_WRITE_WIRE_VERSION) { + options.omitReadPreference = true; + } + if (options.collation && serverWireVersion < SUPPORTS_WRITE_CONCERN_AND_COLLATION) { callback( new MongoCompatibilityError( diff --git a/src/operations/operation.ts b/src/operations/operation.ts index 1142b6cf9e..66198da456 100644 --- a/src/operations/operation.ts +++ b/src/operations/operation.ts @@ -31,6 +31,7 @@ export interface OperationOptions extends BSONSerializeOptions { /** @internal Hints to `executeOperation` that this operation should not unpin on an ended transaction */ bypassPinningCheck?: boolean; + omitReadPreference?: boolean; } /** @internal */ @@ -49,7 +50,7 @@ export abstract class AbstractOperation { readPreference: ReadPreference; server!: Server; bypassPinningCheck: boolean; - trySecondaryWrite = false; + trySecondaryWrite: boolean; // BSON serialization options bsonOptions?: BSONSerializeOptions; @@ -73,6 +74,7 @@ export abstract class AbstractOperation { this.options = options; this.bypassPinningCheck = !!options.bypassPinningCheck; + this.trySecondaryWrite = false; } abstract execute(server: Server, session: ClientSession, callback: Callback): void; diff --git a/src/sdam/server.ts b/src/sdam/server.ts index e3f1fb4096..393e2a72ba 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -299,6 +299,14 @@ export class Server extends TypedEventEmitter { // Clone the options const finalOptions = Object.assign({}, options, { wireProtocolCommand: false }); + // There are cases where we need to flag the read preference not to get sent in + // the command, such as pre-5.0 servers attempting to perform an aggregate write + // with a non-primary read preference. In this case the effective read preference + // (primary) is not the same as the provided and must be removed completely. + if (finalOptions.omitReadPreference) { + delete finalOptions.readPreference; + } + // error if collation not supported if (collationNotSupported(this, cmd)) { callback(new MongoCompatibilityError(`Server ${this.name} does not support collation`)); diff --git a/src/sdam/server_selection.ts b/src/sdam/server_selection.ts index 95010d66f9..444d83ecae 100644 --- a/src/sdam/server_selection.ts +++ b/src/sdam/server_selection.ts @@ -43,8 +43,6 @@ export function secondaryWritableServerSelector( // If server version >= 5.0... // - If read preference is supplied, use that. // - If no read preference is supplied, use primary. - /* eslint no-console: 0 */ - console.log('select', readPreference, wireVersion); if (!readPreference || (wireVersion && wireVersion < MIN_SECONDARY_WRITE_WIRE_VERSION)) { return readPreferenceServerSelector(ReadPreference.primary); } diff --git a/test/unit/sdam/server_selection.test.js b/test/unit/sdam/server_selection.test.js index 6e6c799ca3..715615a4be 100644 --- a/test/unit/sdam/server_selection.test.js +++ b/test/unit/sdam/server_selection.test.js @@ -11,46 +11,107 @@ const { ServerDescription } = require('../../../src/sdam/server_description'); const { TopologyDescription } = require('../../../src/sdam/topology_description'); const { TopologyType } = require('../../../src/sdam/common'); -describe('ServerSelector', function () { +describe('server selection', function () { + const primary = new ServerDescription('127.0.0.1:27017', { + setName: 'test', + isWritablePrimary: true, + ok: 1 + }); + const secondary = new ServerDescription('127.0.0.1:27018', { + setName: 'test', + secondary: true, + ok: 1 + }); + const mongos = new ServerDescription('127.0.0.1:27019', { + msg: 'isdbgrid', + ok: 1 + }); + const loadBalancer = new ServerDescription('127.0.0.1:27020', { ok: 1 }, { loadBalanced: true }); + const single = new ServerDescription('127.0.0.1:27021', { + isWritablePrimary: true, + ok: 1 + }); + describe('#secondaryWritableServerSelector', function () { - const primary = new ServerDescription('127.0.0.1:27017', { - setName: 'test', - isWritablePrimary: true, - ok: 1 - }); - const secondary = new ServerDescription('127.0.0.1:27018', { - setName: 'test', - secondary: true, - ok: 1 - }); - const serverDescriptions = new Map(); - serverDescriptions.set('127.0.0.1:27017', primary); - serverDescriptions.set('127.0.0.1:27018', secondary); - - context('when the common server version is >= 5.0', function () { - const topologyDescription = new TopologyDescription( - TopologyType.ReplicaSetWithPrimary, - serverDescriptions, - 'test', - MIN_SECONDARY_WRITE_WIRE_VERSION, - new ObjectId(), - MIN_SECONDARY_WRITE_WIRE_VERSION - ); - - context('when a read preference is provided', function () { - const selector = secondaryWritableServerSelector( + context('when the topology is a replica set', function () { + const serverDescriptions = new Map(); + serverDescriptions.set('127.0.0.1:27017', primary); + serverDescriptions.set('127.0.0.1:27018', secondary); + + context('when the common server version is >= 5.0', function () { + const topologyDescription = new TopologyDescription( + TopologyType.ReplicaSetWithPrimary, + serverDescriptions, + 'test', MIN_SECONDARY_WRITE_WIRE_VERSION, - ReadPreference.secondary + new ObjectId(), + MIN_SECONDARY_WRITE_WIRE_VERSION ); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); - it('uses the provided read preference', function () { - expect(server).to.deep.equal([secondary]); + context('when a read preference is provided', function () { + const selector = secondaryWritableServerSelector( + MIN_SECONDARY_WRITE_WIRE_VERSION, + ReadPreference.secondary + ); + const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + + it('uses the provided read preference', function () { + expect(server).to.deep.equal([secondary]); + }); + }); + + context('when a read preference is not provided', function () { + const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION); + const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + + it('selects a primary', function () { + expect(server).to.deep.equal([primary]); + }); }); }); - context('when a read preference is not provided', function () { - const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION); + context('when the common server version is < 5.0', function () { + const topologyDescription = new TopologyDescription( + TopologyType.ReplicaSetWithPrimary, + serverDescriptions, + 'test', + MIN_SECONDARY_WRITE_WIRE_VERSION - 1, + new ObjectId(), + MIN_SECONDARY_WRITE_WIRE_VERSION - 1 + ); + + context('when a read preference is provided', function () { + const selector = secondaryWritableServerSelector( + MIN_SECONDARY_WRITE_WIRE_VERSION - 1, + ReadPreference.secondary + ); + const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + + it('selects a primary', function () { + expect(server).to.deep.equal([primary]); + }); + }); + + context('when read preference is not provided', function () { + const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION - 1); + const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + + it('selects a primary', function () { + expect(server).to.deep.equal([primary]); + }); + }); + }); + + context('when a common wire version is not provided', function () { + const topologyDescription = new TopologyDescription( + TopologyType.ReplicaSetWithPrimary, + serverDescriptions, + 'test', + MIN_SECONDARY_WRITE_WIRE_VERSION, + new ObjectId(), + MIN_SECONDARY_WRITE_WIRE_VERSION + ); + const selector = secondaryWritableServerSelector(); const server = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a primary', function () { @@ -59,52 +120,261 @@ describe('ServerSelector', function () { }); }); - context('when the common server version is < 5.0', function () { - const topologyDescription = new TopologyDescription( - TopologyType.ReplicaSetWithPrimary, - serverDescriptions, - 'test', - MIN_SECONDARY_WRITE_WIRE_VERSION - 1, - new ObjectId(), - MIN_SECONDARY_WRITE_WIRE_VERSION - 1 - ); - - context('when a read preference is provided', function () { - const selector = secondaryWritableServerSelector( + context('when the topology is sharded', function () { + const serverDescriptions = new Map(); + serverDescriptions.set('127.0.0.1:27019', mongos); + + context('when the common server version is >= 5.0', function () { + const topologyDescription = new TopologyDescription( + TopologyType.Sharded, + serverDescriptions, + 'test', + MIN_SECONDARY_WRITE_WIRE_VERSION, + new ObjectId(), + MIN_SECONDARY_WRITE_WIRE_VERSION + ); + + context('when a read preference is provided', function () { + const selector = secondaryWritableServerSelector( + MIN_SECONDARY_WRITE_WIRE_VERSION, + ReadPreference.secondary + ); + const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + + it('selects a mongos', function () { + expect(server).to.deep.equal([mongos]); + }); + }); + + context('when a read preference is not provided', function () { + const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION); + const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + + it('selects a mongos', function () { + expect(server).to.deep.equal([mongos]); + }); + }); + }); + + context('when the common server version is < 5.0', function () { + const topologyDescription = new TopologyDescription( + TopologyType.Sharded, + serverDescriptions, + 'test', MIN_SECONDARY_WRITE_WIRE_VERSION - 1, - ReadPreference.secondary + new ObjectId(), + MIN_SECONDARY_WRITE_WIRE_VERSION - 1 ); + + context('when a read preference is provided', function () { + const selector = secondaryWritableServerSelector( + MIN_SECONDARY_WRITE_WIRE_VERSION - 1, + ReadPreference.secondary + ); + const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + + it('selects a mongos', function () { + expect(server).to.deep.equal([mongos]); + }); + }); + + context('when read preference is not provided', function () { + const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION - 1); + const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + + it('selects a mongos', function () { + expect(server).to.deep.equal([mongos]); + }); + }); + }); + + context('when a common wire version is not provided', function () { + const topologyDescription = new TopologyDescription( + TopologyType.Sharded, + serverDescriptions, + 'test', + MIN_SECONDARY_WRITE_WIRE_VERSION, + new ObjectId(), + MIN_SECONDARY_WRITE_WIRE_VERSION + ); + const selector = secondaryWritableServerSelector(); const server = selector(topologyDescription, Array.from(serverDescriptions.values())); - it('selects a primary', function () { - expect(server).to.deep.equal([primary]); + it('selects a mongos', function () { + expect(server).to.deep.equal([mongos]); + }); + }); + }); + + context('when the topology is load balanced', function () { + const serverDescriptions = new Map(); + serverDescriptions.set('127.0.0.1:27020', loadBalancer); + + context('when the common server version is >= 5.0', function () { + const topologyDescription = new TopologyDescription( + TopologyType.LoadBalanced, + serverDescriptions, + 'test', + MIN_SECONDARY_WRITE_WIRE_VERSION, + new ObjectId(), + MIN_SECONDARY_WRITE_WIRE_VERSION + ); + + context('when a read preference is provided', function () { + const selector = secondaryWritableServerSelector( + MIN_SECONDARY_WRITE_WIRE_VERSION, + ReadPreference.secondary + ); + const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + + it('selects a load balancer', function () { + expect(server).to.deep.equal([loadBalancer]); + }); + }); + + context('when a read preference is not provided', function () { + const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION); + const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + + it('selects a load balancer', function () { + expect(server).to.deep.equal([loadBalancer]); + }); }); }); - context('when read preference is not provided', function () { - const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION - 1); + context('when the common server version is < 5.0', function () { + const topologyDescription = new TopologyDescription( + TopologyType.LoadBalanced, + serverDescriptions, + 'test', + MIN_SECONDARY_WRITE_WIRE_VERSION - 1, + new ObjectId(), + MIN_SECONDARY_WRITE_WIRE_VERSION - 1 + ); + + context('when a read preference is provided', function () { + const selector = secondaryWritableServerSelector( + MIN_SECONDARY_WRITE_WIRE_VERSION - 1, + ReadPreference.secondary + ); + const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + + it('selects a load balancer', function () { + expect(server).to.deep.equal([loadBalancer]); + }); + }); + + context('when read preference is not provided', function () { + const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION - 1); + const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + + it('selects a load balancer', function () { + expect(server).to.deep.equal([loadBalancer]); + }); + }); + }); + + context('when a common wire version is not provided', function () { + const topologyDescription = new TopologyDescription( + TopologyType.LoadBalanced, + serverDescriptions, + 'test', + MIN_SECONDARY_WRITE_WIRE_VERSION, + new ObjectId(), + MIN_SECONDARY_WRITE_WIRE_VERSION + ); + const selector = secondaryWritableServerSelector(); const server = selector(topologyDescription, Array.from(serverDescriptions.values())); - it('selects a primary', function () { - expect(server).to.deep.equal([primary]); + it('selects a load balancer', function () { + expect(server).to.deep.equal([loadBalancer]); }); }); }); - context('when a common wire version is not provided', function () { - const topologyDescription = new TopologyDescription( - TopologyType.ReplicaSetWithPrimary, - serverDescriptions, - 'test', - MIN_SECONDARY_WRITE_WIRE_VERSION, - new ObjectId(), - MIN_SECONDARY_WRITE_WIRE_VERSION - ); - const selector = secondaryWritableServerSelector(); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); - - it('selects a primary', function () { - expect(server).to.deep.equal([primary]); + context('when the topology is single', function () { + const serverDescriptions = new Map(); + serverDescriptions.set('127.0.0.1:27020', single); + + context('when the common server version is >= 5.0', function () { + const topologyDescription = new TopologyDescription( + TopologyType.Single, + serverDescriptions, + 'test', + MIN_SECONDARY_WRITE_WIRE_VERSION, + new ObjectId(), + MIN_SECONDARY_WRITE_WIRE_VERSION + ); + + context('when a read preference is provided', function () { + const selector = secondaryWritableServerSelector( + MIN_SECONDARY_WRITE_WIRE_VERSION, + ReadPreference.secondary + ); + const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + + it('selects a standalone', function () { + expect(server).to.deep.equal([single]); + }); + }); + + context('when a read preference is not provided', function () { + const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION); + const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + + it('selects a standalone', function () { + expect(server).to.deep.equal([single]); + }); + }); + }); + + context('when the common server version is < 5.0', function () { + const topologyDescription = new TopologyDescription( + TopologyType.Single, + serverDescriptions, + 'test', + MIN_SECONDARY_WRITE_WIRE_VERSION - 1, + new ObjectId(), + MIN_SECONDARY_WRITE_WIRE_VERSION - 1 + ); + + context('when a read preference is provided', function () { + const selector = secondaryWritableServerSelector( + MIN_SECONDARY_WRITE_WIRE_VERSION - 1, + ReadPreference.secondary + ); + const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + + it('selects a standalone', function () { + expect(server).to.deep.equal([single]); + }); + }); + + context('when read preference is not provided', function () { + const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION - 1); + const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + + it('selects a standalone', function () { + expect(server).to.deep.equal([single]); + }); + }); + }); + + context('when a common wire version is not provided', function () { + const topologyDescription = new TopologyDescription( + TopologyType.Single, + serverDescriptions, + 'test', + MIN_SECONDARY_WRITE_WIRE_VERSION, + new ObjectId(), + MIN_SECONDARY_WRITE_WIRE_VERSION + ); + const selector = secondaryWritableServerSelector(); + const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + + it('selects a standalone', function () { + expect(server).to.deep.equal([single]); + }); }); }); }); From 01fdeca6f1c9389c90c08bbdc0148e7aeba2d033 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Thu, 4 Nov 2021 10:25:54 +0100 Subject: [PATCH 5/7] test: expand secondary write unit testing --- src/sdam/server_selection.ts | 6 ++++- test/unit/operations/aggregate.test.js | 32 ++++++++++++++++++++++++- test/unit/sdam/server_selection.test.js | 6 ++--- 3 files changed, 39 insertions(+), 5 deletions(-) diff --git a/src/sdam/server_selection.ts b/src/sdam/server_selection.ts index 444d83ecae..a3248dfaa0 100644 --- a/src/sdam/server_selection.ts +++ b/src/sdam/server_selection.ts @@ -43,7 +43,11 @@ export function secondaryWritableServerSelector( // If server version >= 5.0... // - If read preference is supplied, use that. // - If no read preference is supplied, use primary. - if (!readPreference || (wireVersion && wireVersion < MIN_SECONDARY_WRITE_WIRE_VERSION)) { + if ( + !readPreference || + !wireVersion || + (wireVersion && wireVersion < MIN_SECONDARY_WRITE_WIRE_VERSION) + ) { return readPreferenceServerSelector(ReadPreference.primary); } return readPreferenceServerSelector(readPreference); diff --git a/test/unit/operations/aggregate.test.js b/test/unit/operations/aggregate.test.js index 74b55f745d..bb6b4544e4 100644 --- a/test/unit/operations/aggregate.test.js +++ b/test/unit/operations/aggregate.test.js @@ -23,6 +23,16 @@ describe('AggregateOperation', function () { }); }); + context('when $out is not the last stage', function () { + const operation = new AggregateOperation(db, [{ $out: 'test' }, { $project: { name: 1 } }], { + dbName: db + }); + + it('sets trySecondaryWrite to false', function () { + expect(operation.trySecondaryWrite).to.be.false; + }); + }); + context('when $merge is the last stage', function () { const operation = new AggregateOperation(db, [{ $merge: { into: 'test' } }], { dbName: db }); @@ -31,12 +41,32 @@ describe('AggregateOperation', function () { }); }); - context('when no writable stages', function () { + context('when $merge is not the last stage', function () { + const operation = new AggregateOperation( + db, + [{ $merge: { into: 'test' } }, { $project: { name: 1 } }], + { dbName: db } + ); + + it('sets trySecondaryWrite to false', function () { + expect(operation.trySecondaryWrite).to.be.false; + }); + }); + + context('when no writable stages in empty pipeline', function () { const operation = new AggregateOperation(db, [], { dbName: db }); it('sets trySecondaryWrite to false', function () { expect(operation.trySecondaryWrite).to.be.false; }); }); + + context('when no writable stages', function () { + const operation = new AggregateOperation(db, [{ $project: { name: 1 } }], { dbName: db }); + + it('sets trySecondaryWrite to false', function () { + expect(operation.trySecondaryWrite).to.be.false; + }); + }); }); }); diff --git a/test/unit/sdam/server_selection.test.js b/test/unit/sdam/server_selection.test.js index 715615a4be..eb4b6f82fc 100644 --- a/test/unit/sdam/server_selection.test.js +++ b/test/unit/sdam/server_selection.test.js @@ -11,7 +11,7 @@ const { ServerDescription } = require('../../../src/sdam/server_description'); const { TopologyDescription } = require('../../../src/sdam/topology_description'); const { TopologyType } = require('../../../src/sdam/common'); -describe('server selection', function () { +describe.only('server selection', function () { const primary = new ServerDescription('127.0.0.1:27017', { setName: 'test', isWritablePrimary: true, @@ -75,7 +75,7 @@ describe('server selection', function () { TopologyType.ReplicaSetWithPrimary, serverDescriptions, 'test', - MIN_SECONDARY_WRITE_WIRE_VERSION - 1, + MIN_SECONDARY_WRITE_WIRE_VERSION, new ObjectId(), MIN_SECONDARY_WRITE_WIRE_VERSION - 1 ); @@ -111,7 +111,7 @@ describe('server selection', function () { new ObjectId(), MIN_SECONDARY_WRITE_WIRE_VERSION ); - const selector = secondaryWritableServerSelector(); + const selector = secondaryWritableServerSelector(undefined, ReadPreference.secondary); const server = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a primary', function () { From f273f4f2c4eb847acd699753c1b7e2584e5e9385 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Thu, 4 Nov 2021 19:00:14 +0100 Subject: [PATCH 6/7] Update test/unit/sdam/server_selection.test.js Co-authored-by: Daria Pardue --- test/unit/sdam/server_selection.test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/unit/sdam/server_selection.test.js b/test/unit/sdam/server_selection.test.js index eb4b6f82fc..382c769ecf 100644 --- a/test/unit/sdam/server_selection.test.js +++ b/test/unit/sdam/server_selection.test.js @@ -11,7 +11,7 @@ const { ServerDescription } = require('../../../src/sdam/server_description'); const { TopologyDescription } = require('../../../src/sdam/topology_description'); const { TopologyType } = require('../../../src/sdam/common'); -describe.only('server selection', function () { +describe('server selection', function () { const primary = new ServerDescription('127.0.0.1:27017', { setName: 'test', isWritablePrimary: true, From 17b93fdd97afdc195d00b248f65563ee1d355d0a Mon Sep 17 00:00:00 2001 From: Daria Pardue Date: Thu, 4 Nov 2021 16:01:00 -0400 Subject: [PATCH 7/7] Apply suggestions from code review --- test/unit/sdam/server_selection.test.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/unit/sdam/server_selection.test.js b/test/unit/sdam/server_selection.test.js index 382c769ecf..272183a6b7 100644 --- a/test/unit/sdam/server_selection.test.js +++ b/test/unit/sdam/server_selection.test.js @@ -161,7 +161,7 @@ describe('server selection', function () { TopologyType.Sharded, serverDescriptions, 'test', - MIN_SECONDARY_WRITE_WIRE_VERSION - 1, + MIN_SECONDARY_WRITE_WIRE_VERSION, new ObjectId(), MIN_SECONDARY_WRITE_WIRE_VERSION - 1 ); @@ -247,7 +247,7 @@ describe('server selection', function () { TopologyType.LoadBalanced, serverDescriptions, 'test', - MIN_SECONDARY_WRITE_WIRE_VERSION - 1, + MIN_SECONDARY_WRITE_WIRE_VERSION, new ObjectId(), MIN_SECONDARY_WRITE_WIRE_VERSION - 1 ); @@ -333,7 +333,7 @@ describe('server selection', function () { TopologyType.Single, serverDescriptions, 'test', - MIN_SECONDARY_WRITE_WIRE_VERSION - 1, + MIN_SECONDARY_WRITE_WIRE_VERSION, new ObjectId(), MIN_SECONDARY_WRITE_WIRE_VERSION - 1 );