From 0d3e37dbdf00dc2661c5be6ac06382436c04ed13 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Mon, 8 Nov 2021 10:33:05 +0100 Subject: [PATCH 01/12] fix(NOODE-3648): implement same server selector --- src/sdam/server_selection.ts | 17 ++++ test/unit/sdam/server_selection.test.js | 125 ++++++++++++++++-------- 2 files changed, 102 insertions(+), 40 deletions(-) diff --git a/src/sdam/server_selection.ts b/src/sdam/server_selection.ts index a3248dfaa0..0f2f6f0030 100644 --- a/src/sdam/server_selection.ts +++ b/src/sdam/server_selection.ts @@ -31,6 +31,23 @@ export function writableServerSelector(): ServerSelector { ); } +/** + * The purpose of this selector is to select the same server, only + * if it is in a state that it can have commands sent to it. + */ +export function sameServerSelector(description: ServerDescription): ServerSelector { + return ( + topologyDescription: TopologyDescription, + servers: ServerDescription[] + ): ServerDescription[] => { + // Filter the servers to match the provided description only if + // the type is not unknown. + return servers.filter((s: ServerDescription) => { + return s.address === description.address && s.type !== ServerType.Unknown; + }); + }; +} + /** * Returns a server selector that uses a read preference to select a * server potentially for a write on a secondary. diff --git a/test/unit/sdam/server_selection.test.js b/test/unit/sdam/server_selection.test.js index 272183a6b7..102a4434c6 100644 --- a/test/unit/sdam/server_selection.test.js +++ b/test/unit/sdam/server_selection.test.js @@ -4,6 +4,7 @@ const { expect } = require('chai'); const { ObjectId } = require('../../../src/bson'); const { ReadPreference } = require('../../../src/read_preference'); const { + sameServerSelector, secondaryWritableServerSelector, MIN_SECONDARY_WRITE_WIRE_VERSION } = require('../../../src/sdam/server_selection'); @@ -31,6 +32,50 @@ describe('server selection', function () { isWritablePrimary: true, ok: 1 }); + const unknown = new ServerDescription('127.0.0.1:27022', { + ok: 0 + }); + + describe('#sameServerSelector', function () { + const serverDescriptions = new Map(); + serverDescriptions.set('127.0.0.1:27017', primary); + serverDescriptions.set('127.0.0.1:27018', unknown); + + context('when the server is unknown', function () { + const topologyDescription = new TopologyDescription( + TopologyType.ReplicaSetWithPrimary, + serverDescriptions, + 'test', + MIN_SECONDARY_WRITE_WIRE_VERSION, + new ObjectId(), + MIN_SECONDARY_WRITE_WIRE_VERSION + ); + const selector = sameServerSelector(unknown); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); + + it('returns an empty array', function () { + expect(servers).to.be.empty; + }); + }); + + context('when the server is not unknown', function () { + const topologyDescription = new TopologyDescription( + TopologyType.ReplicaSetWithPrimary, + serverDescriptions, + 'test', + MIN_SECONDARY_WRITE_WIRE_VERSION, + new ObjectId(), + MIN_SECONDARY_WRITE_WIRE_VERSION + ); + + const selector = sameServerSelector(primary); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); + + it('returns the server', function () { + expect(servers).to.deep.equal([primary]); + }); + }); + }); describe('#secondaryWritableServerSelector', function () { context('when the topology is a replica set', function () { @@ -53,19 +98,19 @@ describe('server selection', function () { MIN_SECONDARY_WRITE_WIRE_VERSION, ReadPreference.secondary ); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('uses the provided read preference', function () { - expect(server).to.deep.equal([secondary]); + expect(servers).to.deep.equal([secondary]); }); }); context('when a read preference is not provided', function () { const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a primary', function () { - expect(server).to.deep.equal([primary]); + expect(servers).to.deep.equal([primary]); }); }); }); @@ -85,19 +130,19 @@ describe('server selection', function () { MIN_SECONDARY_WRITE_WIRE_VERSION - 1, ReadPreference.secondary ); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a primary', function () { - expect(server).to.deep.equal([primary]); + expect(servers).to.deep.equal([primary]); }); }); context('when read preference is not provided', function () { const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION - 1); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a primary', function () { - expect(server).to.deep.equal([primary]); + expect(servers).to.deep.equal([primary]); }); }); }); @@ -112,10 +157,10 @@ describe('server selection', function () { MIN_SECONDARY_WRITE_WIRE_VERSION ); const selector = secondaryWritableServerSelector(undefined, ReadPreference.secondary); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a primary', function () { - expect(server).to.deep.equal([primary]); + expect(servers).to.deep.equal([primary]); }); }); }); @@ -139,19 +184,19 @@ describe('server selection', function () { MIN_SECONDARY_WRITE_WIRE_VERSION, ReadPreference.secondary ); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a mongos', function () { - expect(server).to.deep.equal([mongos]); + expect(servers).to.deep.equal([mongos]); }); }); context('when a read preference is not provided', function () { const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a mongos', function () { - expect(server).to.deep.equal([mongos]); + expect(servers).to.deep.equal([mongos]); }); }); }); @@ -171,19 +216,19 @@ describe('server selection', function () { MIN_SECONDARY_WRITE_WIRE_VERSION - 1, ReadPreference.secondary ); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a mongos', function () { - expect(server).to.deep.equal([mongos]); + expect(servers).to.deep.equal([mongos]); }); }); context('when read preference is not provided', function () { const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION - 1); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a mongos', function () { - expect(server).to.deep.equal([mongos]); + expect(servers).to.deep.equal([mongos]); }); }); }); @@ -198,10 +243,10 @@ describe('server selection', function () { MIN_SECONDARY_WRITE_WIRE_VERSION ); const selector = secondaryWritableServerSelector(); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a mongos', function () { - expect(server).to.deep.equal([mongos]); + expect(servers).to.deep.equal([mongos]); }); }); }); @@ -225,19 +270,19 @@ describe('server selection', function () { MIN_SECONDARY_WRITE_WIRE_VERSION, ReadPreference.secondary ); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a load balancer', function () { - expect(server).to.deep.equal([loadBalancer]); + expect(servers).to.deep.equal([loadBalancer]); }); }); context('when a read preference is not provided', function () { const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a load balancer', function () { - expect(server).to.deep.equal([loadBalancer]); + expect(servers).to.deep.equal([loadBalancer]); }); }); }); @@ -257,19 +302,19 @@ describe('server selection', function () { MIN_SECONDARY_WRITE_WIRE_VERSION - 1, ReadPreference.secondary ); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a load balancer', function () { - expect(server).to.deep.equal([loadBalancer]); + expect(servers).to.deep.equal([loadBalancer]); }); }); context('when read preference is not provided', function () { const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION - 1); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a load balancer', function () { - expect(server).to.deep.equal([loadBalancer]); + expect(servers).to.deep.equal([loadBalancer]); }); }); }); @@ -284,10 +329,10 @@ describe('server selection', function () { MIN_SECONDARY_WRITE_WIRE_VERSION ); const selector = secondaryWritableServerSelector(); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a load balancer', function () { - expect(server).to.deep.equal([loadBalancer]); + expect(servers).to.deep.equal([loadBalancer]); }); }); }); @@ -311,19 +356,19 @@ describe('server selection', function () { MIN_SECONDARY_WRITE_WIRE_VERSION, ReadPreference.secondary ); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a standalone', function () { - expect(server).to.deep.equal([single]); + expect(servers).to.deep.equal([single]); }); }); context('when a read preference is not provided', function () { const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a standalone', function () { - expect(server).to.deep.equal([single]); + expect(servers).to.deep.equal([single]); }); }); }); @@ -343,19 +388,19 @@ describe('server selection', function () { MIN_SECONDARY_WRITE_WIRE_VERSION - 1, ReadPreference.secondary ); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a standalone', function () { - expect(server).to.deep.equal([single]); + expect(servers).to.deep.equal([single]); }); }); context('when read preference is not provided', function () { const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION - 1); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a standalone', function () { - expect(server).to.deep.equal([single]); + expect(servers).to.deep.equal([single]); }); }); }); @@ -370,10 +415,10 @@ describe('server selection', function () { MIN_SECONDARY_WRITE_WIRE_VERSION ); const selector = secondaryWritableServerSelector(); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a standalone', function () { - expect(server).to.deep.equal([single]); + expect(servers).to.deep.equal([single]); }); }); }); From 05db07af47aa52a0a365a48299d577683aedc2ee Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Mon, 8 Nov 2021 10:45:06 +0100 Subject: [PATCH 02/12] fix(NODE-3648): cursor iterating ops use same selector --- src/operations/execute_operation.ts | 17 +++++++++++++---- src/operations/operation.ts | 3 ++- src/sdam/server_selection.ts | 3 ++- test/unit/sdam/server_selection.test.js | 17 +++++++++++++++++ 4 files changed, 34 insertions(+), 6 deletions(-) diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index 4ac0a368e9..12149c2c61 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -17,7 +17,11 @@ import type { Topology } from '../sdam/topology'; import type { ClientSession } from '../sessions'; import type { Document } from '../bson'; import { supportsRetryableWrites } from '../utils'; -import { secondaryWritableServerSelector, ServerSelector } from '../sdam/server_selection'; +import { + sameServerSelector, + secondaryWritableServerSelector, + ServerSelector +} from '../sdam/server_selection'; const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation; const MMAPv1_RETRY_WRITES_ERROR_MESSAGE = @@ -153,9 +157,14 @@ function executeWithServerSelection( let selector: ReadPreference | ServerSelector; - // If operation should try to write to secondary use the custom server selector - // otherwise provide the read preference. - if (operation.trySecondaryWrite) { + if (operation.hasAspect(Aspect.CURSOR_ITERATING)) { + // Get more operations must always select the same server, but run through + // server selection to potentially force monitor checks if the server is + // in an unknown state. + selector = sameServerSelector(operation.server?.description); + } else if (operation.trySecondaryWrite) { + // If operation should try to write to secondary use the custom server selector + // otherwise provide the read preference. selector = secondaryWritableServerSelector(topology.commonWireVersion, readPreference); } else { selector = readPreference; diff --git a/src/operations/operation.ts b/src/operations/operation.ts index 66198da456..d26fbf2ce7 100644 --- a/src/operations/operation.ts +++ b/src/operations/operation.ts @@ -10,7 +10,8 @@ export const Aspect = { RETRYABLE: Symbol('RETRYABLE'), EXPLAINABLE: Symbol('EXPLAINABLE'), SKIP_COLLATION: Symbol('SKIP_COLLATION'), - CURSOR_CREATING: Symbol('CURSOR_CREATING') + CURSOR_CREATING: Symbol('CURSOR_CREATING'), + CURSOR_ITERATING: Symbol('CURSOR_ITERATING') } as const; /** @public */ diff --git a/src/sdam/server_selection.ts b/src/sdam/server_selection.ts index 0f2f6f0030..a0a3cd7a51 100644 --- a/src/sdam/server_selection.ts +++ b/src/sdam/server_selection.ts @@ -35,11 +35,12 @@ export function writableServerSelector(): ServerSelector { * The purpose of this selector is to select the same server, only * if it is in a state that it can have commands sent to it. */ -export function sameServerSelector(description: ServerDescription): ServerSelector { +export function sameServerSelector(description?: ServerDescription): ServerSelector { return ( topologyDescription: TopologyDescription, servers: ServerDescription[] ): ServerDescription[] => { + if (!description) return []; // Filter the servers to match the provided description only if // the type is not unknown. return servers.filter((s: ServerDescription) => { diff --git a/test/unit/sdam/server_selection.test.js b/test/unit/sdam/server_selection.test.js index 102a4434c6..bacd032f10 100644 --- a/test/unit/sdam/server_selection.test.js +++ b/test/unit/sdam/server_selection.test.js @@ -75,6 +75,23 @@ describe('server selection', function () { expect(servers).to.deep.equal([primary]); }); }); + + context('when no server description provided', function () { + const topologyDescription = new TopologyDescription( + TopologyType.ReplicaSetWithPrimary, + serverDescriptions, + 'test', + MIN_SECONDARY_WRITE_WIRE_VERSION, + new ObjectId(), + MIN_SECONDARY_WRITE_WIRE_VERSION + ); + const selector = sameServerSelector(); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); + + it('returns an empty array', function () { + expect(servers).to.be.empty; + }); + }); }); describe('#secondaryWritableServerSelector', function () { From 7d57268fcd9379ff77225d1ca394038a98fed92d Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Mon, 8 Nov 2021 12:05:13 +0100 Subject: [PATCH 03/12] fix(NODE-3648): add get more operation --- src/operations/get_more.ts | 48 +++++++++++++++++++++++++++ test/unit/operations/get_more.test.js | 26 +++++++++++++++ 2 files changed, 74 insertions(+) create mode 100644 src/operations/get_more.ts create mode 100644 test/unit/operations/get_more.test.js diff --git a/src/operations/get_more.ts b/src/operations/get_more.ts new file mode 100644 index 0000000000..0b3647de29 --- /dev/null +++ b/src/operations/get_more.ts @@ -0,0 +1,48 @@ +import type { Document, Long } from '../bson'; +import type { Callback, MongoDBNamespace } from '../utils'; +import type { Server } from '../sdam/server'; +import { Aspect, AbstractOperation, OperationOptions, defineAspects } from './operation'; +import type { ClientSession } from '../sessions'; + +/** + * @public + * @typeParam TSchema - Unused schema definition, deprecated usage, only specify `FindOptions` with no generic + */ +// eslint-disable-next-line @typescript-eslint/no-unused-vars +export interface GetMoreOptions extends OperationOptions { + /** Set the batchSize for the getMoreCommand when iterating over the query results. */ + batchSize?: number; + /** You can put a $comment field on a query to make looking in the profiler logs simpler. */ + comment?: string | Document; + /** Number of milliseconds to wait before aborting the query. */ + maxTimeMS?: number; +} + +/** @internal */ +export class GetMoreOperation extends AbstractOperation { + cursorId: Long; + options: GetMoreOptions; + + constructor(ns: MongoDBNamespace, cursorId: Long, options: GetMoreOptions = {}) { + super(options); + this.options = options; + this.ns = ns; + this.cursorId = cursorId; + } + + execute(server: Server, session: ClientSession, callback: Callback): void { + this.server = server; + + server.getMore( + this.ns, + this.cursorId, + { + ...this.options, + session: session + }, + callback + ); + } +} + +defineAspects(GetMoreOperation, [Aspect.READ_OPERATION, Aspect.RETRYABLE, Aspect.CURSOR_ITERATING]); diff --git a/test/unit/operations/get_more.test.js b/test/unit/operations/get_more.test.js new file mode 100644 index 0000000000..740f295a11 --- /dev/null +++ b/test/unit/operations/get_more.test.js @@ -0,0 +1,26 @@ +'use strict'; + +const { expect } = require('chai'); +const { Long } = require('../../../src/bson'); +const { GetMoreOperation } = require('../../../src/operations/get_more'); + +describe('GetMoreOperation', function () { + describe('#constructor', function () { + const ns = 'db.coll'; + const cursorId = Long.fromNumber(1); + const options = { batchSize: 100, comment: 'test', maxTimeMS: 500 }; + const operation = new GetMoreOperation(ns, cursorId, options); + + it('sets the namespace', function () { + expect(operation.ns).to.equal(ns); + }); + + it('sets the cursorId', function () { + expect(operation.cursorId).to.equal(cursorId); + }); + + it('sets the options', function () { + expect(operation.options).to.deep.equal(options); + }); + }); +}); From 6dd5d5f5f2415e324b30a90f9af11700f9cfd75c Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Mon, 8 Nov 2021 12:58:55 +0100 Subject: [PATCH 04/12] fix(NODE-3648): unit test get more execute --- src/operations/get_more.ts | 10 ++++--- test/unit/operations/get_more.test.js | 38 ++++++++++++++++++++++++--- 2 files changed, 41 insertions(+), 7 deletions(-) diff --git a/src/operations/get_more.ts b/src/operations/get_more.ts index 0b3647de29..242b4a4525 100644 --- a/src/operations/get_more.ts +++ b/src/operations/get_more.ts @@ -22,17 +22,21 @@ export interface GetMoreOptions extends Ope export class GetMoreOperation extends AbstractOperation { cursorId: Long; options: GetMoreOptions; + server: Server; - constructor(ns: MongoDBNamespace, cursorId: Long, options: GetMoreOptions = {}) { + constructor(ns: MongoDBNamespace, cursorId: Long, server: Server, options: GetMoreOptions = {}) { super(options); this.options = options; this.ns = ns; this.cursorId = cursorId; + this.server = server; } + /** + * Although there is a server already associated with the get more operation, the signature + * for execute passes a server so we will just use that one. + */ execute(server: Server, session: ClientSession, callback: Callback): void { - this.server = server; - server.getMore( this.ns, this.cursorId, diff --git a/test/unit/operations/get_more.test.js b/test/unit/operations/get_more.test.js index 740f295a11..1dbc3b7164 100644 --- a/test/unit/operations/get_more.test.js +++ b/test/unit/operations/get_more.test.js @@ -1,15 +1,20 @@ 'use strict'; +const sinon = require('sinon'); const { expect } = require('chai'); const { Long } = require('../../../src/bson'); const { GetMoreOperation } = require('../../../src/operations/get_more'); +const { Server } = require('../../../src/sdam/server'); +const { ClientSession } = require('../../../src/sessions'); describe('GetMoreOperation', function () { + const ns = 'db.coll'; + const cursorId = Long.fromNumber(1); + const options = { batchSize: 100, comment: 'test', maxTimeMS: 500 }; + describe('#constructor', function () { - const ns = 'db.coll'; - const cursorId = Long.fromNumber(1); - const options = { batchSize: 100, comment: 'test', maxTimeMS: 500 }; - const operation = new GetMoreOperation(ns, cursorId, options); + const server = sinon.createStubInstance(Server, {}); + const operation = new GetMoreOperation(ns, cursorId, server, options); it('sets the namespace', function () { expect(operation.ns).to.equal(ns); @@ -19,8 +24,33 @@ describe('GetMoreOperation', function () { expect(operation.cursorId).to.equal(cursorId); }); + it('sets the server', function () { + expect(operation.server).to.equal(server); + }); + it('sets the options', function () { expect(operation.options).to.deep.equal(options); }); }); + + describe('#execute', function () { + const getMoreStub = sinon.stub().yields(undefined); + const server = sinon.createStubInstance(Server, { + getMore: getMoreStub + }); + const session = sinon.createStubInstance(ClientSession); + const operation = new GetMoreOperation(ns, cursorId, server, options); + + it('executes a getmore on the provided server', function (done) { + const callback = () => { + const call = getMoreStub.getCall(0); + expect(getMoreStub.calledOnce).to.be.true; + expect(call.args[0]).to.equal(ns); + expect(call.args[1]).to.equal(cursorId); + expect(call.args[2]).to.deep.equal({ ...options, session }); + done(); + }; + operation.execute(server, session, callback); + }); + }); }); From f21b49a13e6112fed853e54e6641468f152d714d Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Mon, 8 Nov 2021 15:41:40 +0100 Subject: [PATCH 05/12] fix(NODE-3648): hook operation into cursor --- src/cursor/abstract_cursor.ts | 20 +++++++++----------- src/operations/get_more.ts | 12 ++---------- test/functional/change_stream_spec.test.js | 2 ++ test/unit/operations/get_more.test.js | 13 ++++++++++--- 4 files changed, 23 insertions(+), 24 deletions(-) diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index b4be34bc28..f30fbf1e91 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -14,7 +14,8 @@ import { ReadPreference, ReadPreferenceLike } from '../read_preference'; import type { Server } from '../sdam/server'; import type { Topology } from '../sdam/topology'; import { Readable, Transform } from 'stream'; -import type { ExecutionResult } from '../operations/execute_operation'; +import { executeOperation, ExecutionResult } from '../operations/execute_operation'; +import { GetMoreOperation } from '../operations/get_more'; import { ReadConcern, ReadConcernLike } from '../read_concern'; import { TODO_NODE_3286, TypedEventEmitter } from '../mongo_types'; @@ -610,16 +611,13 @@ export abstract class AbstractCursor< return; } - server.getMore( - cursorNs, - cursorId, - { - ...this[kOptions], - session: this[kSession], - batchSize - }, - callback - ); + const getMoreOperation = new GetMoreOperation(cursorNs, cursorId, server, { + ...this[kOptions], + session: this[kSession], + batchSize + }); + + executeOperation(this.topology, getMoreOperation, callback); } } diff --git a/src/operations/get_more.ts b/src/operations/get_more.ts index 242b4a4525..e6d15fbee3 100644 --- a/src/operations/get_more.ts +++ b/src/operations/get_more.ts @@ -37,16 +37,8 @@ export class GetMoreOperation extends AbstractOperation { * for execute passes a server so we will just use that one. */ execute(server: Server, session: ClientSession, callback: Callback): void { - server.getMore( - this.ns, - this.cursorId, - { - ...this.options, - session: session - }, - callback - ); + server.getMore(this.ns, this.cursorId, this.options, callback); } } -defineAspects(GetMoreOperation, [Aspect.READ_OPERATION, Aspect.RETRYABLE, Aspect.CURSOR_ITERATING]); +defineAspects(GetMoreOperation, [Aspect.READ_OPERATION, Aspect.CURSOR_ITERATING]); diff --git a/test/functional/change_stream_spec.test.js b/test/functional/change_stream_spec.test.js index e08199e612..ada7affa63 100644 --- a/test/functional/change_stream_spec.test.js +++ b/test/functional/change_stream_spec.test.js @@ -65,6 +65,7 @@ describe('Change Stream Spec - v1', function () { ctx.database = ctx.client.db(sDB); ctx.collection = ctx.database.collection(sColl); ctx.client.on('commandStarted', e => { + console.log(e); if (e.commandName !== 'ismaster') _events.push(e); }); }); @@ -170,6 +171,7 @@ describe('Change Stream Spec - v1', function () { const expectedEvents = test.expectations || []; return function testAPM(ctx, events) { + console.log('events', events); expectedEvents .map(e => e.command_started_event) .map(normalizeAPMEvent) diff --git a/test/unit/operations/get_more.test.js b/test/unit/operations/get_more.test.js index 1dbc3b7164..df0996a5ae 100644 --- a/test/unit/operations/get_more.test.js +++ b/test/unit/operations/get_more.test.js @@ -6,11 +6,17 @@ const { Long } = require('../../../src/bson'); const { GetMoreOperation } = require('../../../src/operations/get_more'); const { Server } = require('../../../src/sdam/server'); const { ClientSession } = require('../../../src/sessions'); +const { ReadPreference } = require('../../../src/read_preference'); describe('GetMoreOperation', function () { const ns = 'db.coll'; const cursorId = Long.fromNumber(1); - const options = { batchSize: 100, comment: 'test', maxTimeMS: 500 }; + const options = { + batchSize: 100, + comment: 'test', + maxTimeMS: 500, + readPreference: ReadPreference.primary + }; describe('#constructor', function () { const server = sinon.createStubInstance(Server, {}); @@ -39,7 +45,8 @@ describe('GetMoreOperation', function () { getMore: getMoreStub }); const session = sinon.createStubInstance(ClientSession); - const operation = new GetMoreOperation(ns, cursorId, server, options); + const opts = { ...options, session }; + const operation = new GetMoreOperation(ns, cursorId, server, opts); it('executes a getmore on the provided server', function (done) { const callback = () => { @@ -47,7 +54,7 @@ describe('GetMoreOperation', function () { expect(getMoreStub.calledOnce).to.be.true; expect(call.args[0]).to.equal(ns); expect(call.args[1]).to.equal(cursorId); - expect(call.args[2]).to.deep.equal({ ...options, session }); + expect(call.args[2]).to.deep.equal(opts); done(); }; operation.execute(server, session, callback); From 49f91c0921c5f75ba2e5795e5e2513c651eea292 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Mon, 8 Nov 2021 20:12:13 +0100 Subject: [PATCH 06/12] fix(NODE-3648): add get more operation aspect tests --- test/functional/change_stream_spec.test.js | 2 -- test/unit/operations/get_more.test.js | 30 ++++++++++++++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/test/functional/change_stream_spec.test.js b/test/functional/change_stream_spec.test.js index ada7affa63..e08199e612 100644 --- a/test/functional/change_stream_spec.test.js +++ b/test/functional/change_stream_spec.test.js @@ -65,7 +65,6 @@ describe('Change Stream Spec - v1', function () { ctx.database = ctx.client.db(sDB); ctx.collection = ctx.database.collection(sColl); ctx.client.on('commandStarted', e => { - console.log(e); if (e.commandName !== 'ismaster') _events.push(e); }); }); @@ -171,7 +170,6 @@ describe('Change Stream Spec - v1', function () { const expectedEvents = test.expectations || []; return function testAPM(ctx, events) { - console.log('events', events); expectedEvents .map(e => e.command_started_event) .map(normalizeAPMEvent) diff --git a/test/unit/operations/get_more.test.js b/test/unit/operations/get_more.test.js index df0996a5ae..29afc27b27 100644 --- a/test/unit/operations/get_more.test.js +++ b/test/unit/operations/get_more.test.js @@ -7,6 +7,7 @@ const { GetMoreOperation } = require('../../../src/operations/get_more'); const { Server } = require('../../../src/sdam/server'); const { ClientSession } = require('../../../src/sessions'); const { ReadPreference } = require('../../../src/read_preference'); +const { Aspect } = require('../../../src/operations/operation'); describe('GetMoreOperation', function () { const ns = 'db.coll'; @@ -60,4 +61,33 @@ describe('GetMoreOperation', function () { operation.execute(server, session, callback); }); }); + + describe('#hasAspect', function () { + const server = sinon.createStubInstance(Server, {}); + const operation = new GetMoreOperation(ns, cursorId, server, options); + + context('when the aspect is cursor iterating', function () { + it('returns true', function () { + expect(operation.hasAspect(Aspect.CURSOR_ITERATING)).to.be.true; + }); + }); + + context('when the aspect is read', function () { + it('returns true', function () { + expect(operation.hasAspect(Aspect.READ_OPERATION)).to.be.true; + }); + }); + + context('when the aspect is write', function () { + it('returns false', function () { + expect(operation.hasAspect(Aspect.WRITE_OPERATION)).to.be.false; + }); + }); + + context('when the aspect is retryable', function () { + it('returns false', function () { + expect(operation.hasAspect(Aspect.RETRYABLE)).to.be.false; + }); + }); + }); }); From 7076ae1d92cd6fa53fe131e0c01c68c0a7666d97 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Mon, 8 Nov 2021 21:12:30 +0100 Subject: [PATCH 07/12] fix(NODE-3648): remove bad comment --- src/operations/get_more.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/operations/get_more.ts b/src/operations/get_more.ts index e6d15fbee3..b9aa3e0195 100644 --- a/src/operations/get_more.ts +++ b/src/operations/get_more.ts @@ -6,7 +6,6 @@ import type { ClientSession } from '../sessions'; /** * @public - * @typeParam TSchema - Unused schema definition, deprecated usage, only specify `FindOptions` with no generic */ // eslint-disable-next-line @typescript-eslint/no-unused-vars export interface GetMoreOptions extends OperationOptions { From 5abdf35dce8fa4c06e1ca648c1d5143e4ae558cb Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Wed, 17 Nov 2021 14:34:30 +0000 Subject: [PATCH 08/12] fix: only extend operation options. --- src/operations/get_more.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/operations/get_more.ts b/src/operations/get_more.ts index b9aa3e0195..85d7301d7d 100644 --- a/src/operations/get_more.ts +++ b/src/operations/get_more.ts @@ -8,7 +8,7 @@ import type { ClientSession } from '../sessions'; * @public */ // eslint-disable-next-line @typescript-eslint/no-unused-vars -export interface GetMoreOptions extends OperationOptions { +export interface GetMoreOptions extends OperationOptions { /** Set the batchSize for the getMoreCommand when iterating over the query results. */ batchSize?: number; /** You can put a $comment field on a query to make looking in the profiler logs simpler. */ From 7f3c7d365030da84145596eb85b31ee93ff6de3a Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Wed, 17 Nov 2021 14:35:45 +0000 Subject: [PATCH 09/12] fix: use implied typing. Co-authored-by: Neal Beeken --- src/sdam/server_selection.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/sdam/server_selection.ts b/src/sdam/server_selection.ts index a0a3cd7a51..422a49207e 100644 --- a/src/sdam/server_selection.ts +++ b/src/sdam/server_selection.ts @@ -43,7 +43,8 @@ export function sameServerSelector(description?: ServerDescription): ServerSelec if (!description) return []; // Filter the servers to match the provided description only if // the type is not unknown. - return servers.filter((s: ServerDescription) => { + return servers.filter(sd => { + return sd.address === description.address && sd.type !== ServerType.Unknown; return s.address === description.address && s.type !== ServerType.Unknown; }); }; From 24c47508b4dbe9306c5cfe438795130384d32ef2 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Wed, 17 Nov 2021 16:53:06 +0100 Subject: [PATCH 10/12] feat(NODE-3648): add check for same server --- src/operations/get_more.ts | 6 +++ src/sdam/server_selection.ts | 1 - test/unit/operations/get_more.test.js | 59 +++++++++++++++++++-------- 3 files changed, 48 insertions(+), 18 deletions(-) diff --git a/src/operations/get_more.ts b/src/operations/get_more.ts index 85d7301d7d..a11824b64f 100644 --- a/src/operations/get_more.ts +++ b/src/operations/get_more.ts @@ -1,4 +1,5 @@ import type { Document, Long } from '../bson'; +import { MongoRuntimeError } from '../error'; import type { Callback, MongoDBNamespace } from '../utils'; import type { Server } from '../sdam/server'; import { Aspect, AbstractOperation, OperationOptions, defineAspects } from './operation'; @@ -36,6 +37,11 @@ export class GetMoreOperation extends AbstractOperation { * for execute passes a server so we will just use that one. */ execute(server: Server, session: ClientSession, callback: Callback): void { + if (server !== this.server) { + return callback( + new MongoRuntimeError('Getmore must run on the same server operation began on') + ); + } server.getMore(this.ns, this.cursorId, this.options, callback); } } diff --git a/src/sdam/server_selection.ts b/src/sdam/server_selection.ts index 422a49207e..5a51a7a94e 100644 --- a/src/sdam/server_selection.ts +++ b/src/sdam/server_selection.ts @@ -45,7 +45,6 @@ export function sameServerSelector(description?: ServerDescription): ServerSelec // the type is not unknown. return servers.filter(sd => { return sd.address === description.address && sd.type !== ServerType.Unknown; - return s.address === description.address && s.type !== ServerType.Unknown; }); }; } diff --git a/test/unit/operations/get_more.test.js b/test/unit/operations/get_more.test.js index 29afc27b27..704c8c63b1 100644 --- a/test/unit/operations/get_more.test.js +++ b/test/unit/operations/get_more.test.js @@ -8,6 +8,7 @@ const { Server } = require('../../../src/sdam/server'); const { ClientSession } = require('../../../src/sessions'); const { ReadPreference } = require('../../../src/read_preference'); const { Aspect } = require('../../../src/operations/operation'); +const { MongoRuntimeError } = require('../../../src/error'); describe('GetMoreOperation', function () { const ns = 'db.coll'; @@ -41,24 +42,48 @@ describe('GetMoreOperation', function () { }); describe('#execute', function () { - const getMoreStub = sinon.stub().yields(undefined); - const server = sinon.createStubInstance(Server, { - getMore: getMoreStub + context('when the server is the same as the instance', function () { + const getMoreStub = sinon.stub().yields(undefined); + const server = sinon.createStubInstance(Server, { + getMore: getMoreStub + }); + const session = sinon.createStubInstance(ClientSession); + const opts = { ...options, session }; + const operation = new GetMoreOperation(ns, cursorId, server, opts); + + it('executes a getmore on the provided server', function (done) { + const callback = () => { + const call = getMoreStub.getCall(0); + expect(getMoreStub.calledOnce).to.be.true; + expect(call.args[0]).to.equal(ns); + expect(call.args[1]).to.equal(cursorId); + expect(call.args[2]).to.deep.equal(opts); + done(); + }; + operation.execute(server, session, callback); + }); }); - const session = sinon.createStubInstance(ClientSession); - const opts = { ...options, session }; - const operation = new GetMoreOperation(ns, cursorId, server, opts); - - it('executes a getmore on the provided server', function (done) { - const callback = () => { - const call = getMoreStub.getCall(0); - expect(getMoreStub.calledOnce).to.be.true; - expect(call.args[0]).to.equal(ns); - expect(call.args[1]).to.equal(cursorId); - expect(call.args[2]).to.deep.equal(opts); - done(); - }; - operation.execute(server, session, callback); + + context('when the server is not the same as the instance', function () { + const getMoreStub = sinon.stub().yields(undefined); + const server = sinon.createStubInstance(Server, { + getMore: getMoreStub + }); + const newServer = sinon.createStubInstance(Server, { + getMore: getMoreStub + }); + const session = sinon.createStubInstance(ClientSession); + const opts = { ...options, session }; + const operation = new GetMoreOperation(ns, cursorId, server, opts); + + it('errors in the callback', function (done) { + const callback = error => { + expect(error).to.be.instanceOf(MongoRuntimeError); + expect(error.message).to.equal('Getmore must run on the same server operation began on'); + done(); + }; + operation.execute(newServer, session, callback); + }); }); }); From 2f2690e25194b2b3cbbb25f152a81168db483522 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Thu, 18 Nov 2021 10:18:39 +0100 Subject: [PATCH 11/12] test: dry up getmore and server selection tests --- test/unit/operations/get_more.test.js | 8 +-- test/unit/sdam/server_selection.test.js | 72 ++++++++++++------------- 2 files changed, 38 insertions(+), 42 deletions(-) diff --git a/test/unit/operations/get_more.test.js b/test/unit/operations/get_more.test.js index 704c8c63b1..501e488ebd 100644 --- a/test/unit/operations/get_more.test.js +++ b/test/unit/operations/get_more.test.js @@ -11,14 +11,14 @@ const { Aspect } = require('../../../src/operations/operation'); const { MongoRuntimeError } = require('../../../src/error'); describe('GetMoreOperation', function () { - const ns = 'db.coll'; - const cursorId = Long.fromNumber(1); - const options = { + const ns = Object.freeze('db.coll'); + const cursorId = Object.freeze(Long.fromNumber(1)); + const options = Object.freeze({ batchSize: 100, comment: 'test', maxTimeMS: 500, readPreference: ReadPreference.primary - }; + }); describe('#constructor', function () { const server = sinon.createStubInstance(Server, {}); diff --git a/test/unit/sdam/server_selection.test.js b/test/unit/sdam/server_selection.test.js index bacd032f10..4f32651be2 100644 --- a/test/unit/sdam/server_selection.test.js +++ b/test/unit/sdam/server_selection.test.js @@ -1,6 +1,7 @@ 'use strict'; const { expect } = require('chai'); +const sinon = require('sinon'); const { ObjectId } = require('../../../src/bson'); const { ReadPreference } = require('../../../src/read_preference'); const { @@ -37,21 +38,21 @@ describe('server selection', function () { }); describe('#sameServerSelector', function () { + const topologyDescription = sinon.stub(); const serverDescriptions = new Map(); - serverDescriptions.set('127.0.0.1:27017', primary); - serverDescriptions.set('127.0.0.1:27018', unknown); + serverDescriptions.set(primary.address, primary); + serverDescriptions.set(unknown.address, unknown); + let selector; + let servers; + + beforeEach(function () { + servers = selector(topologyDescription, Array.from(serverDescriptions.values())); + }); context('when the server is unknown', function () { - const topologyDescription = new TopologyDescription( - TopologyType.ReplicaSetWithPrimary, - serverDescriptions, - 'test', - MIN_SECONDARY_WRITE_WIRE_VERSION, - new ObjectId(), - MIN_SECONDARY_WRITE_WIRE_VERSION - ); - const selector = sameServerSelector(unknown); - const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); + before(function () { + selector = sameServerSelector(unknown); + }); it('returns an empty array', function () { expect(servers).to.be.empty; @@ -59,17 +60,9 @@ describe('server selection', function () { }); context('when the server is not unknown', function () { - const topologyDescription = new TopologyDescription( - TopologyType.ReplicaSetWithPrimary, - serverDescriptions, - 'test', - MIN_SECONDARY_WRITE_WIRE_VERSION, - new ObjectId(), - MIN_SECONDARY_WRITE_WIRE_VERSION - ); - - const selector = sameServerSelector(primary); - const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); + before(function () { + selector = sameServerSelector(primary); + }); it('returns the server', function () { expect(servers).to.deep.equal([primary]); @@ -77,16 +70,19 @@ describe('server selection', function () { }); context('when no server description provided', function () { - const topologyDescription = new TopologyDescription( - TopologyType.ReplicaSetWithPrimary, - serverDescriptions, - 'test', - MIN_SECONDARY_WRITE_WIRE_VERSION, - new ObjectId(), - MIN_SECONDARY_WRITE_WIRE_VERSION - ); - const selector = sameServerSelector(); - const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); + before(function () { + selector = sameServerSelector(); + }); + + it('returns an empty array', function () { + expect(servers).to.be.empty; + }); + }); + + context('when the server is not the same', function () { + before(function () { + selector = sameServerSelector(secondary); + }); it('returns an empty array', function () { expect(servers).to.be.empty; @@ -97,8 +93,8 @@ describe('server selection', function () { describe('#secondaryWritableServerSelector', function () { context('when the topology is a replica set', function () { const serverDescriptions = new Map(); - serverDescriptions.set('127.0.0.1:27017', primary); - serverDescriptions.set('127.0.0.1:27018', secondary); + serverDescriptions.set(primary.address, primary); + serverDescriptions.set(secondary.address, secondary); context('when the common server version is >= 5.0', function () { const topologyDescription = new TopologyDescription( @@ -184,7 +180,7 @@ describe('server selection', function () { context('when the topology is sharded', function () { const serverDescriptions = new Map(); - serverDescriptions.set('127.0.0.1:27019', mongos); + serverDescriptions.set(mongos.address, mongos); context('when the common server version is >= 5.0', function () { const topologyDescription = new TopologyDescription( @@ -270,7 +266,7 @@ describe('server selection', function () { context('when the topology is load balanced', function () { const serverDescriptions = new Map(); - serverDescriptions.set('127.0.0.1:27020', loadBalancer); + serverDescriptions.set(loadBalancer.address, loadBalancer); context('when the common server version is >= 5.0', function () { const topologyDescription = new TopologyDescription( @@ -356,7 +352,7 @@ describe('server selection', function () { context('when the topology is single', function () { const serverDescriptions = new Map(); - serverDescriptions.set('127.0.0.1:27020', single); + serverDescriptions.set(single.address, single); context('when the common server version is >= 5.0', function () { const topologyDescription = new TopologyDescription( From fc88b8e7ca29f39cc1d800ae49aa2a0384fc311c Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Thu, 18 Nov 2021 16:32:08 +0100 Subject: [PATCH 12/12] fix: dont freeze string Co-authored-by: Daria Pardue --- test/unit/operations/get_more.test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/unit/operations/get_more.test.js b/test/unit/operations/get_more.test.js index 501e488ebd..99621a1f64 100644 --- a/test/unit/operations/get_more.test.js +++ b/test/unit/operations/get_more.test.js @@ -11,7 +11,7 @@ const { Aspect } = require('../../../src/operations/operation'); const { MongoRuntimeError } = require('../../../src/error'); describe('GetMoreOperation', function () { - const ns = Object.freeze('db.coll'); + const ns = 'db.coll'; const cursorId = Object.freeze(Long.fromNumber(1)); const options = Object.freeze({ batchSize: 100,