diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index b4be34bc28..f30fbf1e91 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -14,7 +14,8 @@ import { ReadPreference, ReadPreferenceLike } from '../read_preference'; import type { Server } from '../sdam/server'; import type { Topology } from '../sdam/topology'; import { Readable, Transform } from 'stream'; -import type { ExecutionResult } from '../operations/execute_operation'; +import { executeOperation, ExecutionResult } from '../operations/execute_operation'; +import { GetMoreOperation } from '../operations/get_more'; import { ReadConcern, ReadConcernLike } from '../read_concern'; import { TODO_NODE_3286, TypedEventEmitter } from '../mongo_types'; @@ -610,16 +611,13 @@ export abstract class AbstractCursor< return; } - server.getMore( - cursorNs, - cursorId, - { - ...this[kOptions], - session: this[kSession], - batchSize - }, - callback - ); + const getMoreOperation = new GetMoreOperation(cursorNs, cursorId, server, { + ...this[kOptions], + session: this[kSession], + batchSize + }); + + executeOperation(this.topology, getMoreOperation, callback); } } diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index 4ac0a368e9..12149c2c61 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -17,7 +17,11 @@ import type { Topology } from '../sdam/topology'; import type { ClientSession } from '../sessions'; import type { Document } from '../bson'; import { supportsRetryableWrites } from '../utils'; -import { secondaryWritableServerSelector, ServerSelector } from '../sdam/server_selection'; +import { + sameServerSelector, + secondaryWritableServerSelector, + ServerSelector +} from '../sdam/server_selection'; const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation; const MMAPv1_RETRY_WRITES_ERROR_MESSAGE = @@ -153,9 +157,14 @@ function executeWithServerSelection( let selector: ReadPreference | ServerSelector; - // If operation should try to write to secondary use the custom server selector - // otherwise provide the read preference. - if (operation.trySecondaryWrite) { + if (operation.hasAspect(Aspect.CURSOR_ITERATING)) { + // Get more operations must always select the same server, but run through + // server selection to potentially force monitor checks if the server is + // in an unknown state. + selector = sameServerSelector(operation.server?.description); + } else if (operation.trySecondaryWrite) { + // If operation should try to write to secondary use the custom server selector + // otherwise provide the read preference. selector = secondaryWritableServerSelector(topology.commonWireVersion, readPreference); } else { selector = readPreference; diff --git a/src/operations/get_more.ts b/src/operations/get_more.ts new file mode 100644 index 0000000000..a11824b64f --- /dev/null +++ b/src/operations/get_more.ts @@ -0,0 +1,49 @@ +import type { Document, Long } from '../bson'; +import { MongoRuntimeError } from '../error'; +import type { Callback, MongoDBNamespace } from '../utils'; +import type { Server } from '../sdam/server'; +import { Aspect, AbstractOperation, OperationOptions, defineAspects } from './operation'; +import type { ClientSession } from '../sessions'; + +/** + * @public + */ +// eslint-disable-next-line @typescript-eslint/no-unused-vars +export interface GetMoreOptions extends OperationOptions { + /** Set the batchSize for the getMoreCommand when iterating over the query results. */ + batchSize?: number; + /** You can put a $comment field on a query to make looking in the profiler logs simpler. */ + comment?: string | Document; + /** Number of milliseconds to wait before aborting the query. */ + maxTimeMS?: number; +} + +/** @internal */ +export class GetMoreOperation extends AbstractOperation { + cursorId: Long; + options: GetMoreOptions; + server: Server; + + constructor(ns: MongoDBNamespace, cursorId: Long, server: Server, options: GetMoreOptions = {}) { + super(options); + this.options = options; + this.ns = ns; + this.cursorId = cursorId; + this.server = server; + } + + /** + * Although there is a server already associated with the get more operation, the signature + * for execute passes a server so we will just use that one. + */ + execute(server: Server, session: ClientSession, callback: Callback): void { + if (server !== this.server) { + return callback( + new MongoRuntimeError('Getmore must run on the same server operation began on') + ); + } + server.getMore(this.ns, this.cursorId, this.options, callback); + } +} + +defineAspects(GetMoreOperation, [Aspect.READ_OPERATION, Aspect.CURSOR_ITERATING]); diff --git a/src/operations/operation.ts b/src/operations/operation.ts index 66198da456..d26fbf2ce7 100644 --- a/src/operations/operation.ts +++ b/src/operations/operation.ts @@ -10,7 +10,8 @@ export const Aspect = { RETRYABLE: Symbol('RETRYABLE'), EXPLAINABLE: Symbol('EXPLAINABLE'), SKIP_COLLATION: Symbol('SKIP_COLLATION'), - CURSOR_CREATING: Symbol('CURSOR_CREATING') + CURSOR_CREATING: Symbol('CURSOR_CREATING'), + CURSOR_ITERATING: Symbol('CURSOR_ITERATING') } as const; /** @public */ diff --git a/src/sdam/server_selection.ts b/src/sdam/server_selection.ts index a3248dfaa0..5a51a7a94e 100644 --- a/src/sdam/server_selection.ts +++ b/src/sdam/server_selection.ts @@ -31,6 +31,24 @@ export function writableServerSelector(): ServerSelector { ); } +/** + * The purpose of this selector is to select the same server, only + * if it is in a state that it can have commands sent to it. + */ +export function sameServerSelector(description?: ServerDescription): ServerSelector { + return ( + topologyDescription: TopologyDescription, + servers: ServerDescription[] + ): ServerDescription[] => { + if (!description) return []; + // Filter the servers to match the provided description only if + // the type is not unknown. + return servers.filter(sd => { + return sd.address === description.address && sd.type !== ServerType.Unknown; + }); + }; +} + /** * Returns a server selector that uses a read preference to select a * server potentially for a write on a secondary. diff --git a/test/unit/operations/get_more.test.js b/test/unit/operations/get_more.test.js new file mode 100644 index 0000000000..99621a1f64 --- /dev/null +++ b/test/unit/operations/get_more.test.js @@ -0,0 +1,118 @@ +'use strict'; + +const sinon = require('sinon'); +const { expect } = require('chai'); +const { Long } = require('../../../src/bson'); +const { GetMoreOperation } = require('../../../src/operations/get_more'); +const { Server } = require('../../../src/sdam/server'); +const { ClientSession } = require('../../../src/sessions'); +const { ReadPreference } = require('../../../src/read_preference'); +const { Aspect } = require('../../../src/operations/operation'); +const { MongoRuntimeError } = require('../../../src/error'); + +describe('GetMoreOperation', function () { + const ns = 'db.coll'; + const cursorId = Object.freeze(Long.fromNumber(1)); + const options = Object.freeze({ + batchSize: 100, + comment: 'test', + maxTimeMS: 500, + readPreference: ReadPreference.primary + }); + + describe('#constructor', function () { + const server = sinon.createStubInstance(Server, {}); + const operation = new GetMoreOperation(ns, cursorId, server, options); + + it('sets the namespace', function () { + expect(operation.ns).to.equal(ns); + }); + + it('sets the cursorId', function () { + expect(operation.cursorId).to.equal(cursorId); + }); + + it('sets the server', function () { + expect(operation.server).to.equal(server); + }); + + it('sets the options', function () { + expect(operation.options).to.deep.equal(options); + }); + }); + + describe('#execute', function () { + context('when the server is the same as the instance', function () { + const getMoreStub = sinon.stub().yields(undefined); + const server = sinon.createStubInstance(Server, { + getMore: getMoreStub + }); + const session = sinon.createStubInstance(ClientSession); + const opts = { ...options, session }; + const operation = new GetMoreOperation(ns, cursorId, server, opts); + + it('executes a getmore on the provided server', function (done) { + const callback = () => { + const call = getMoreStub.getCall(0); + expect(getMoreStub.calledOnce).to.be.true; + expect(call.args[0]).to.equal(ns); + expect(call.args[1]).to.equal(cursorId); + expect(call.args[2]).to.deep.equal(opts); + done(); + }; + operation.execute(server, session, callback); + }); + }); + + context('when the server is not the same as the instance', function () { + const getMoreStub = sinon.stub().yields(undefined); + const server = sinon.createStubInstance(Server, { + getMore: getMoreStub + }); + const newServer = sinon.createStubInstance(Server, { + getMore: getMoreStub + }); + const session = sinon.createStubInstance(ClientSession); + const opts = { ...options, session }; + const operation = new GetMoreOperation(ns, cursorId, server, opts); + + it('errors in the callback', function (done) { + const callback = error => { + expect(error).to.be.instanceOf(MongoRuntimeError); + expect(error.message).to.equal('Getmore must run on the same server operation began on'); + done(); + }; + operation.execute(newServer, session, callback); + }); + }); + }); + + describe('#hasAspect', function () { + const server = sinon.createStubInstance(Server, {}); + const operation = new GetMoreOperation(ns, cursorId, server, options); + + context('when the aspect is cursor iterating', function () { + it('returns true', function () { + expect(operation.hasAspect(Aspect.CURSOR_ITERATING)).to.be.true; + }); + }); + + context('when the aspect is read', function () { + it('returns true', function () { + expect(operation.hasAspect(Aspect.READ_OPERATION)).to.be.true; + }); + }); + + context('when the aspect is write', function () { + it('returns false', function () { + expect(operation.hasAspect(Aspect.WRITE_OPERATION)).to.be.false; + }); + }); + + context('when the aspect is retryable', function () { + it('returns false', function () { + expect(operation.hasAspect(Aspect.RETRYABLE)).to.be.false; + }); + }); + }); +}); diff --git a/test/unit/sdam/server_selection.test.js b/test/unit/sdam/server_selection.test.js index 272183a6b7..4f32651be2 100644 --- a/test/unit/sdam/server_selection.test.js +++ b/test/unit/sdam/server_selection.test.js @@ -1,9 +1,11 @@ 'use strict'; const { expect } = require('chai'); +const sinon = require('sinon'); const { ObjectId } = require('../../../src/bson'); const { ReadPreference } = require('../../../src/read_preference'); const { + sameServerSelector, secondaryWritableServerSelector, MIN_SECONDARY_WRITE_WIRE_VERSION } = require('../../../src/sdam/server_selection'); @@ -31,12 +33,68 @@ describe('server selection', function () { isWritablePrimary: true, ok: 1 }); + const unknown = new ServerDescription('127.0.0.1:27022', { + ok: 0 + }); + + describe('#sameServerSelector', function () { + const topologyDescription = sinon.stub(); + const serverDescriptions = new Map(); + serverDescriptions.set(primary.address, primary); + serverDescriptions.set(unknown.address, unknown); + let selector; + let servers; + + beforeEach(function () { + servers = selector(topologyDescription, Array.from(serverDescriptions.values())); + }); + + context('when the server is unknown', function () { + before(function () { + selector = sameServerSelector(unknown); + }); + + it('returns an empty array', function () { + expect(servers).to.be.empty; + }); + }); + + context('when the server is not unknown', function () { + before(function () { + selector = sameServerSelector(primary); + }); + + it('returns the server', function () { + expect(servers).to.deep.equal([primary]); + }); + }); + + context('when no server description provided', function () { + before(function () { + selector = sameServerSelector(); + }); + + it('returns an empty array', function () { + expect(servers).to.be.empty; + }); + }); + + context('when the server is not the same', function () { + before(function () { + selector = sameServerSelector(secondary); + }); + + it('returns an empty array', function () { + expect(servers).to.be.empty; + }); + }); + }); describe('#secondaryWritableServerSelector', function () { context('when the topology is a replica set', function () { const serverDescriptions = new Map(); - serverDescriptions.set('127.0.0.1:27017', primary); - serverDescriptions.set('127.0.0.1:27018', secondary); + serverDescriptions.set(primary.address, primary); + serverDescriptions.set(secondary.address, secondary); context('when the common server version is >= 5.0', function () { const topologyDescription = new TopologyDescription( @@ -53,19 +111,19 @@ describe('server selection', function () { MIN_SECONDARY_WRITE_WIRE_VERSION, ReadPreference.secondary ); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('uses the provided read preference', function () { - expect(server).to.deep.equal([secondary]); + expect(servers).to.deep.equal([secondary]); }); }); context('when a read preference is not provided', function () { const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a primary', function () { - expect(server).to.deep.equal([primary]); + expect(servers).to.deep.equal([primary]); }); }); }); @@ -85,19 +143,19 @@ describe('server selection', function () { MIN_SECONDARY_WRITE_WIRE_VERSION - 1, ReadPreference.secondary ); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a primary', function () { - expect(server).to.deep.equal([primary]); + expect(servers).to.deep.equal([primary]); }); }); context('when read preference is not provided', function () { const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION - 1); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a primary', function () { - expect(server).to.deep.equal([primary]); + expect(servers).to.deep.equal([primary]); }); }); }); @@ -112,17 +170,17 @@ describe('server selection', function () { MIN_SECONDARY_WRITE_WIRE_VERSION ); const selector = secondaryWritableServerSelector(undefined, ReadPreference.secondary); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a primary', function () { - expect(server).to.deep.equal([primary]); + expect(servers).to.deep.equal([primary]); }); }); }); context('when the topology is sharded', function () { const serverDescriptions = new Map(); - serverDescriptions.set('127.0.0.1:27019', mongos); + serverDescriptions.set(mongos.address, mongos); context('when the common server version is >= 5.0', function () { const topologyDescription = new TopologyDescription( @@ -139,19 +197,19 @@ describe('server selection', function () { MIN_SECONDARY_WRITE_WIRE_VERSION, ReadPreference.secondary ); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a mongos', function () { - expect(server).to.deep.equal([mongos]); + expect(servers).to.deep.equal([mongos]); }); }); context('when a read preference is not provided', function () { const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a mongos', function () { - expect(server).to.deep.equal([mongos]); + expect(servers).to.deep.equal([mongos]); }); }); }); @@ -171,19 +229,19 @@ describe('server selection', function () { MIN_SECONDARY_WRITE_WIRE_VERSION - 1, ReadPreference.secondary ); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a mongos', function () { - expect(server).to.deep.equal([mongos]); + expect(servers).to.deep.equal([mongos]); }); }); context('when read preference is not provided', function () { const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION - 1); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a mongos', function () { - expect(server).to.deep.equal([mongos]); + expect(servers).to.deep.equal([mongos]); }); }); }); @@ -198,17 +256,17 @@ describe('server selection', function () { MIN_SECONDARY_WRITE_WIRE_VERSION ); const selector = secondaryWritableServerSelector(); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a mongos', function () { - expect(server).to.deep.equal([mongos]); + expect(servers).to.deep.equal([mongos]); }); }); }); context('when the topology is load balanced', function () { const serverDescriptions = new Map(); - serverDescriptions.set('127.0.0.1:27020', loadBalancer); + serverDescriptions.set(loadBalancer.address, loadBalancer); context('when the common server version is >= 5.0', function () { const topologyDescription = new TopologyDescription( @@ -225,19 +283,19 @@ describe('server selection', function () { MIN_SECONDARY_WRITE_WIRE_VERSION, ReadPreference.secondary ); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a load balancer', function () { - expect(server).to.deep.equal([loadBalancer]); + expect(servers).to.deep.equal([loadBalancer]); }); }); context('when a read preference is not provided', function () { const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a load balancer', function () { - expect(server).to.deep.equal([loadBalancer]); + expect(servers).to.deep.equal([loadBalancer]); }); }); }); @@ -257,19 +315,19 @@ describe('server selection', function () { MIN_SECONDARY_WRITE_WIRE_VERSION - 1, ReadPreference.secondary ); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a load balancer', function () { - expect(server).to.deep.equal([loadBalancer]); + expect(servers).to.deep.equal([loadBalancer]); }); }); context('when read preference is not provided', function () { const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION - 1); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a load balancer', function () { - expect(server).to.deep.equal([loadBalancer]); + expect(servers).to.deep.equal([loadBalancer]); }); }); }); @@ -284,17 +342,17 @@ describe('server selection', function () { MIN_SECONDARY_WRITE_WIRE_VERSION ); const selector = secondaryWritableServerSelector(); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a load balancer', function () { - expect(server).to.deep.equal([loadBalancer]); + expect(servers).to.deep.equal([loadBalancer]); }); }); }); context('when the topology is single', function () { const serverDescriptions = new Map(); - serverDescriptions.set('127.0.0.1:27020', single); + serverDescriptions.set(single.address, single); context('when the common server version is >= 5.0', function () { const topologyDescription = new TopologyDescription( @@ -311,19 +369,19 @@ describe('server selection', function () { MIN_SECONDARY_WRITE_WIRE_VERSION, ReadPreference.secondary ); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a standalone', function () { - expect(server).to.deep.equal([single]); + expect(servers).to.deep.equal([single]); }); }); context('when a read preference is not provided', function () { const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a standalone', function () { - expect(server).to.deep.equal([single]); + expect(servers).to.deep.equal([single]); }); }); }); @@ -343,19 +401,19 @@ describe('server selection', function () { MIN_SECONDARY_WRITE_WIRE_VERSION - 1, ReadPreference.secondary ); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a standalone', function () { - expect(server).to.deep.equal([single]); + expect(servers).to.deep.equal([single]); }); }); context('when read preference is not provided', function () { const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION - 1); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a standalone', function () { - expect(server).to.deep.equal([single]); + expect(servers).to.deep.equal([single]); }); }); }); @@ -370,10 +428,10 @@ describe('server selection', function () { MIN_SECONDARY_WRITE_WIRE_VERSION ); const selector = secondaryWritableServerSelector(); - const server = selector(topologyDescription, Array.from(serverDescriptions.values())); + const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); it('selects a standalone', function () { - expect(server).to.deep.equal([single]); + expect(servers).to.deep.equal([single]); }); }); });