Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-3648): run get more ops through server selection #3030

Merged
merged 12 commits into from Nov 18, 2021
20 changes: 9 additions & 11 deletions src/cursor/abstract_cursor.ts
Expand Up @@ -14,7 +14,8 @@ import { ReadPreference, ReadPreferenceLike } from '../read_preference';
import type { Server } from '../sdam/server';
import type { Topology } from '../sdam/topology';
import { Readable, Transform } from 'stream';
import type { ExecutionResult } from '../operations/execute_operation';
import { executeOperation, ExecutionResult } from '../operations/execute_operation';
import { GetMoreOperation } from '../operations/get_more';
import { ReadConcern, ReadConcernLike } from '../read_concern';
import { TODO_NODE_3286, TypedEventEmitter } from '../mongo_types';

Expand Down Expand Up @@ -610,16 +611,13 @@ export abstract class AbstractCursor<
return;
}

server.getMore(
cursorNs,
cursorId,
{
...this[kOptions],
session: this[kSession],
batchSize
},
callback
);
const getMoreOperation = new GetMoreOperation(cursorNs, cursorId, server, {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
...this[kOptions],
session: this[kSession],
batchSize
});

executeOperation(this.topology, getMoreOperation, callback);
}
}

Expand Down
17 changes: 13 additions & 4 deletions src/operations/execute_operation.ts
Expand Up @@ -17,7 +17,11 @@ import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import type { Document } from '../bson';
import { supportsRetryableWrites } from '../utils';
import { secondaryWritableServerSelector, ServerSelector } from '../sdam/server_selection';
import {
sameServerSelector,
secondaryWritableServerSelector,
ServerSelector
} from '../sdam/server_selection';

const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
const MMAPv1_RETRY_WRITES_ERROR_MESSAGE =
Expand Down Expand Up @@ -153,9 +157,14 @@ function executeWithServerSelection(

let selector: ReadPreference | ServerSelector;

// If operation should try to write to secondary use the custom server selector
// otherwise provide the read preference.
if (operation.trySecondaryWrite) {
if (operation.hasAspect(Aspect.CURSOR_ITERATING)) {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
// Get more operations must always select the same server, but run through
// server selection to potentially force monitor checks if the server is
// in an unknown state.
selector = sameServerSelector(operation.server?.description);
} else if (operation.trySecondaryWrite) {
// If operation should try to write to secondary use the custom server selector
// otherwise provide the read preference.
selector = secondaryWritableServerSelector(topology.commonWireVersion, readPreference);
} else {
selector = readPreference;
Expand Down
49 changes: 49 additions & 0 deletions src/operations/get_more.ts
@@ -0,0 +1,49 @@
import type { Document, Long } from '../bson';
import { MongoRuntimeError } from '../error';
import type { Callback, MongoDBNamespace } from '../utils';
import type { Server } from '../sdam/server';
import { Aspect, AbstractOperation, OperationOptions, defineAspects } from './operation';
import type { ClientSession } from '../sessions';

/**
* @public
*/
// eslint-disable-next-line @typescript-eslint/no-unused-vars
export interface GetMoreOptions extends OperationOptions {
/** Set the batchSize for the getMoreCommand when iterating over the query results. */
batchSize?: number;
/** You can put a $comment field on a query to make looking in the profiler logs simpler. */
comment?: string | Document;
/** Number of milliseconds to wait before aborting the query. */
maxTimeMS?: number;
}

/** @internal */
export class GetMoreOperation extends AbstractOperation {
cursorId: Long;
options: GetMoreOptions;
server: Server;

constructor(ns: MongoDBNamespace, cursorId: Long, server: Server, options: GetMoreOptions = {}) {
super(options);
this.options = options;
this.ns = ns;
this.cursorId = cursorId;
this.server = server;
}

/**
* Although there is a server already associated with the get more operation, the signature
* for execute passes a server so we will just use that one.
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
*/
execute(server: Server, session: ClientSession, callback: Callback<Document>): void {
if (server !== this.server) {
return callback(
new MongoRuntimeError('Getmore must run on the same server operation began on')
);
}
server.getMore(this.ns, this.cursorId, this.options, callback);
}
}

defineAspects(GetMoreOperation, [Aspect.READ_OPERATION, Aspect.CURSOR_ITERATING]);
3 changes: 2 additions & 1 deletion src/operations/operation.ts
Expand Up @@ -10,7 +10,8 @@ export const Aspect = {
RETRYABLE: Symbol('RETRYABLE'),
EXPLAINABLE: Symbol('EXPLAINABLE'),
SKIP_COLLATION: Symbol('SKIP_COLLATION'),
CURSOR_CREATING: Symbol('CURSOR_CREATING')
CURSOR_CREATING: Symbol('CURSOR_CREATING'),
CURSOR_ITERATING: Symbol('CURSOR_ITERATING')
} as const;

/** @public */
Expand Down
18 changes: 18 additions & 0 deletions src/sdam/server_selection.ts
Expand Up @@ -31,6 +31,24 @@ export function writableServerSelector(): ServerSelector {
);
}

/**
* The purpose of this selector is to select the same server, only
* if it is in a state that it can have commands sent to it.
*/
export function sameServerSelector(description?: ServerDescription): ServerSelector {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
return (
topologyDescription: TopologyDescription,
servers: ServerDescription[]
): ServerDescription[] => {
if (!description) return [];
// Filter the servers to match the provided description only if
// the type is not unknown.
return servers.filter(sd => {
return sd.address === description.address && sd.type !== ServerType.Unknown;
});
};
}

/**
* Returns a server selector that uses a read preference to select a
* server potentially for a write on a secondary.
Expand Down
118 changes: 118 additions & 0 deletions test/unit/operations/get_more.test.js
@@ -0,0 +1,118 @@
'use strict';

const sinon = require('sinon');
const { expect } = require('chai');
const { Long } = require('../../../src/bson');
const { GetMoreOperation } = require('../../../src/operations/get_more');
const { Server } = require('../../../src/sdam/server');
const { ClientSession } = require('../../../src/sessions');
const { ReadPreference } = require('../../../src/read_preference');
const { Aspect } = require('../../../src/operations/operation');
const { MongoRuntimeError } = require('../../../src/error');

describe('GetMoreOperation', function () {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
const ns = 'db.coll';
const cursorId = Long.fromNumber(1);
const options = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we freeze these so that we can make sure the options aren't mutated? (otherwise that deep equal check below doesn't guarantee as much)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now all params are frozen.

batchSize: 100,
comment: 'test',
maxTimeMS: 500,
readPreference: ReadPreference.primary
};

describe('#constructor', function () {
const server = sinon.createStubInstance(Server, {});
const operation = new GetMoreOperation(ns, cursorId, server, options);

it('sets the namespace', function () {
expect(operation.ns).to.equal(ns);
});

it('sets the cursorId', function () {
expect(operation.cursorId).to.equal(cursorId);
});

it('sets the server', function () {
expect(operation.server).to.equal(server);
});

it('sets the options', function () {
expect(operation.options).to.deep.equal(options);
});
});

describe('#execute', function () {
context('when the server is the same as the instance', function () {
const getMoreStub = sinon.stub().yields(undefined);
const server = sinon.createStubInstance(Server, {
getMore: getMoreStub
});
const session = sinon.createStubInstance(ClientSession);
const opts = { ...options, session };
const operation = new GetMoreOperation(ns, cursorId, server, opts);

it('executes a getmore on the provided server', function (done) {
const callback = () => {
const call = getMoreStub.getCall(0);
expect(getMoreStub.calledOnce).to.be.true;
expect(call.args[0]).to.equal(ns);
expect(call.args[1]).to.equal(cursorId);
expect(call.args[2]).to.deep.equal(opts);
done();
};
operation.execute(server, session, callback);
});
});

context('when the server is not the same as the instance', function () {
const getMoreStub = sinon.stub().yields(undefined);
const server = sinon.createStubInstance(Server, {
getMore: getMoreStub
});
const newServer = sinon.createStubInstance(Server, {
getMore: getMoreStub
});
const session = sinon.createStubInstance(ClientSession);
const opts = { ...options, session };
const operation = new GetMoreOperation(ns, cursorId, server, opts);

it('errors in the callback', function (done) {
const callback = error => {
expect(error).to.be.instanceOf(MongoRuntimeError);
expect(error.message).to.equal('Getmore must run on the same server operation began on');
done();
};
operation.execute(newServer, session, callback);
});
});
});

describe('#hasAspect', function () {
const server = sinon.createStubInstance(Server, {});
const operation = new GetMoreOperation(ns, cursorId, server, options);

context('when the aspect is cursor iterating', function () {
it('returns true', function () {
expect(operation.hasAspect(Aspect.CURSOR_ITERATING)).to.be.true;
});
});

context('when the aspect is read', function () {
it('returns true', function () {
expect(operation.hasAspect(Aspect.READ_OPERATION)).to.be.true;
});
});

context('when the aspect is write', function () {
it('returns false', function () {
expect(operation.hasAspect(Aspect.WRITE_OPERATION)).to.be.false;
});
});

context('when the aspect is retryable', function () {
it('returns false', function () {
expect(operation.hasAspect(Aspect.RETRYABLE)).to.be.false;
});
});
});
});