Skip to content

Commit

Permalink
fix(NODE-3648): run get more ops through server selection (#3030)
Browse files Browse the repository at this point in the history
  • Loading branch information
durran committed Nov 18, 2021
1 parent ac44599 commit 268e211
Show file tree
Hide file tree
Showing 7 changed files with 312 additions and 61 deletions.
20 changes: 9 additions & 11 deletions src/cursor/abstract_cursor.ts
Expand Up @@ -14,7 +14,8 @@ import { ReadPreference, ReadPreferenceLike } from '../read_preference';
import type { Server } from '../sdam/server';
import type { Topology } from '../sdam/topology';
import { Readable, Transform } from 'stream';
import type { ExecutionResult } from '../operations/execute_operation';
import { executeOperation, ExecutionResult } from '../operations/execute_operation';
import { GetMoreOperation } from '../operations/get_more';
import { ReadConcern, ReadConcernLike } from '../read_concern';
import { TODO_NODE_3286, TypedEventEmitter } from '../mongo_types';

Expand Down Expand Up @@ -610,16 +611,13 @@ export abstract class AbstractCursor<
return;
}

server.getMore(
cursorNs,
cursorId,
{
...this[kOptions],
session: this[kSession],
batchSize
},
callback
);
const getMoreOperation = new GetMoreOperation(cursorNs, cursorId, server, {
...this[kOptions],
session: this[kSession],
batchSize
});

executeOperation(this.topology, getMoreOperation, callback);
}
}

Expand Down
17 changes: 13 additions & 4 deletions src/operations/execute_operation.ts
Expand Up @@ -17,7 +17,11 @@ import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import type { Document } from '../bson';
import { supportsRetryableWrites } from '../utils';
import { secondaryWritableServerSelector, ServerSelector } from '../sdam/server_selection';
import {
sameServerSelector,
secondaryWritableServerSelector,
ServerSelector
} from '../sdam/server_selection';

const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
const MMAPv1_RETRY_WRITES_ERROR_MESSAGE =
Expand Down Expand Up @@ -153,9 +157,14 @@ function executeWithServerSelection(

let selector: ReadPreference | ServerSelector;

// If operation should try to write to secondary use the custom server selector
// otherwise provide the read preference.
if (operation.trySecondaryWrite) {
if (operation.hasAspect(Aspect.CURSOR_ITERATING)) {
// Get more operations must always select the same server, but run through
// server selection to potentially force monitor checks if the server is
// in an unknown state.
selector = sameServerSelector(operation.server?.description);
} else if (operation.trySecondaryWrite) {
// If operation should try to write to secondary use the custom server selector
// otherwise provide the read preference.
selector = secondaryWritableServerSelector(topology.commonWireVersion, readPreference);
} else {
selector = readPreference;
Expand Down
49 changes: 49 additions & 0 deletions src/operations/get_more.ts
@@ -0,0 +1,49 @@
import type { Document, Long } from '../bson';
import { MongoRuntimeError } from '../error';
import type { Callback, MongoDBNamespace } from '../utils';
import type { Server } from '../sdam/server';
import { Aspect, AbstractOperation, OperationOptions, defineAspects } from './operation';
import type { ClientSession } from '../sessions';

/**
* @public
*/
// eslint-disable-next-line @typescript-eslint/no-unused-vars
export interface GetMoreOptions extends OperationOptions {
/** Set the batchSize for the getMoreCommand when iterating over the query results. */
batchSize?: number;
/** You can put a $comment field on a query to make looking in the profiler logs simpler. */
comment?: string | Document;
/** Number of milliseconds to wait before aborting the query. */
maxTimeMS?: number;
}

/** @internal */
export class GetMoreOperation extends AbstractOperation {
cursorId: Long;
options: GetMoreOptions;
server: Server;

constructor(ns: MongoDBNamespace, cursorId: Long, server: Server, options: GetMoreOptions = {}) {
super(options);
this.options = options;
this.ns = ns;
this.cursorId = cursorId;
this.server = server;
}

/**
* Although there is a server already associated with the get more operation, the signature
* for execute passes a server so we will just use that one.
*/
execute(server: Server, session: ClientSession, callback: Callback<Document>): void {
if (server !== this.server) {
return callback(
new MongoRuntimeError('Getmore must run on the same server operation began on')
);
}
server.getMore(this.ns, this.cursorId, this.options, callback);
}
}

defineAspects(GetMoreOperation, [Aspect.READ_OPERATION, Aspect.CURSOR_ITERATING]);
3 changes: 2 additions & 1 deletion src/operations/operation.ts
Expand Up @@ -10,7 +10,8 @@ export const Aspect = {
RETRYABLE: Symbol('RETRYABLE'),
EXPLAINABLE: Symbol('EXPLAINABLE'),
SKIP_COLLATION: Symbol('SKIP_COLLATION'),
CURSOR_CREATING: Symbol('CURSOR_CREATING')
CURSOR_CREATING: Symbol('CURSOR_CREATING'),
CURSOR_ITERATING: Symbol('CURSOR_ITERATING')
} as const;

/** @public */
Expand Down
18 changes: 18 additions & 0 deletions src/sdam/server_selection.ts
Expand Up @@ -31,6 +31,24 @@ export function writableServerSelector(): ServerSelector {
);
}

/**
* The purpose of this selector is to select the same server, only
* if it is in a state that it can have commands sent to it.
*/
export function sameServerSelector(description?: ServerDescription): ServerSelector {
return (
topologyDescription: TopologyDescription,
servers: ServerDescription[]
): ServerDescription[] => {
if (!description) return [];
// Filter the servers to match the provided description only if
// the type is not unknown.
return servers.filter(sd => {
return sd.address === description.address && sd.type !== ServerType.Unknown;
});
};
}

/**
* Returns a server selector that uses a read preference to select a
* server potentially for a write on a secondary.
Expand Down
118 changes: 118 additions & 0 deletions test/unit/operations/get_more.test.js
@@ -0,0 +1,118 @@
'use strict';

const sinon = require('sinon');
const { expect } = require('chai');
const { Long } = require('../../../src/bson');
const { GetMoreOperation } = require('../../../src/operations/get_more');
const { Server } = require('../../../src/sdam/server');
const { ClientSession } = require('../../../src/sessions');
const { ReadPreference } = require('../../../src/read_preference');
const { Aspect } = require('../../../src/operations/operation');
const { MongoRuntimeError } = require('../../../src/error');

describe('GetMoreOperation', function () {
const ns = 'db.coll';
const cursorId = Object.freeze(Long.fromNumber(1));
const options = Object.freeze({
batchSize: 100,
comment: 'test',
maxTimeMS: 500,
readPreference: ReadPreference.primary
});

describe('#constructor', function () {
const server = sinon.createStubInstance(Server, {});
const operation = new GetMoreOperation(ns, cursorId, server, options);

it('sets the namespace', function () {
expect(operation.ns).to.equal(ns);
});

it('sets the cursorId', function () {
expect(operation.cursorId).to.equal(cursorId);
});

it('sets the server', function () {
expect(operation.server).to.equal(server);
});

it('sets the options', function () {
expect(operation.options).to.deep.equal(options);
});
});

describe('#execute', function () {
context('when the server is the same as the instance', function () {
const getMoreStub = sinon.stub().yields(undefined);
const server = sinon.createStubInstance(Server, {
getMore: getMoreStub
});
const session = sinon.createStubInstance(ClientSession);
const opts = { ...options, session };
const operation = new GetMoreOperation(ns, cursorId, server, opts);

it('executes a getmore on the provided server', function (done) {
const callback = () => {
const call = getMoreStub.getCall(0);
expect(getMoreStub.calledOnce).to.be.true;
expect(call.args[0]).to.equal(ns);
expect(call.args[1]).to.equal(cursorId);
expect(call.args[2]).to.deep.equal(opts);
done();
};
operation.execute(server, session, callback);
});
});

context('when the server is not the same as the instance', function () {
const getMoreStub = sinon.stub().yields(undefined);
const server = sinon.createStubInstance(Server, {
getMore: getMoreStub
});
const newServer = sinon.createStubInstance(Server, {
getMore: getMoreStub
});
const session = sinon.createStubInstance(ClientSession);
const opts = { ...options, session };
const operation = new GetMoreOperation(ns, cursorId, server, opts);

it('errors in the callback', function (done) {
const callback = error => {
expect(error).to.be.instanceOf(MongoRuntimeError);
expect(error.message).to.equal('Getmore must run on the same server operation began on');
done();
};
operation.execute(newServer, session, callback);
});
});
});

describe('#hasAspect', function () {
const server = sinon.createStubInstance(Server, {});
const operation = new GetMoreOperation(ns, cursorId, server, options);

context('when the aspect is cursor iterating', function () {
it('returns true', function () {
expect(operation.hasAspect(Aspect.CURSOR_ITERATING)).to.be.true;
});
});

context('when the aspect is read', function () {
it('returns true', function () {
expect(operation.hasAspect(Aspect.READ_OPERATION)).to.be.true;
});
});

context('when the aspect is write', function () {
it('returns false', function () {
expect(operation.hasAspect(Aspect.WRITE_OPERATION)).to.be.false;
});
});

context('when the aspect is retryable', function () {
it('returns false', function () {
expect(operation.hasAspect(Aspect.RETRYABLE)).to.be.false;
});
});
});
});

0 comments on commit 268e211

Please sign in to comment.