Skip to content

Commit

Permalink
fix(NODE-3521): update session support checks (#3151)
Browse files Browse the repository at this point in the history
  • Loading branch information
durran committed Mar 3, 2022
1 parent ade5ec2 commit aaa453d
Show file tree
Hide file tree
Showing 5 changed files with 261 additions and 4 deletions.
11 changes: 9 additions & 2 deletions src/cursor/abstract_cursor.ts
Expand Up @@ -653,8 +653,15 @@ function next<T>(cursor: AbstractCursor, blocking: boolean, callback: Callback<T

if (cursorId == null) {
// All cursors must operate within a session, one must be made implicitly if not explicitly provided
if (cursor[kSession] == null && cursor[kTopology].hasSessionSupport()) {
cursor[kSession] = cursor[kTopology].startSession({ owner: cursor, explicit: false });
if (cursor[kSession] == null) {
if (cursor[kTopology].shouldCheckForSessionSupport()) {
return cursor[kTopology].selectServer(ReadPreference.primaryPreferred, err => {
if (err) return callback(err);
return next(cursor, blocking, callback);
});
} else if (cursor[kTopology].hasSessionSupport()) {
cursor[kSession] = cursor[kTopology].startSession({ owner: cursor, explicit: false });
}
}

cursor._initialize(cursor[kSession], (err, state) => {
Expand Down
2 changes: 1 addition & 1 deletion src/cursor/aggregation_cursor.ts
Expand Up @@ -61,7 +61,7 @@ export class AggregationCursor<TSchema = Document> extends AbstractCursor<TSchem
}

/** @internal */
_initialize(session: ClientSession | undefined, callback: Callback<ExecutionResult>): void {
_initialize(session: ClientSession, callback: Callback<ExecutionResult>): void {
const aggregateOperation = new AggregateOperation(this.namespace, this[kPipeline], {
...this[kOptions],
...this.cursorOptions,
Expand Down
2 changes: 1 addition & 1 deletion src/cursor/find_cursor.ts
Expand Up @@ -68,7 +68,7 @@ export class FindCursor<TSchema = Document> extends AbstractCursor<TSchema> {
}

/** @internal */
_initialize(session: ClientSession | undefined, callback: Callback<ExecutionResult>): void {
_initialize(session: ClientSession, callback: Callback<ExecutionResult>): void {
const findOperation = new FindOperation(undefined, this.namespace, this[kFilter], {
...this[kBuiltOptions], // NOTE: order matters here, we may need to refine this
...this.cursorOptions,
Expand Down
138 changes: 138 additions & 0 deletions test/unit/cursor/aggregation_cursor.test.js
@@ -0,0 +1,138 @@
'use strict';

const { expect } = require('chai');
const mock = require('../../tools/mongodb-mock/index');
const { Topology } = require('../../../src/sdam/topology');
const { Long } = require('bson');
const { MongoDBNamespace, isHello } = require('../../../src/utils');
const { AggregationCursor } = require('../../../src/cursor/aggregation_cursor');

const test = {};
describe('Aggregation Cursor', function () {
describe('#next', function () {
afterEach(function () {
mock.cleanup();
});
beforeEach(async function () {
test.server = await mock.createServer();
});

context('when there is a data bearing server', function () {
beforeEach(function () {
test.server.setMessageHandler(request => {
const doc = request.document;
if (isHello(doc)) {
request.reply(mock.HELLO);
} else if (doc.aggregate) {
request.reply({
cursor: {
id: Long.fromNumber(1),
ns: 'test.test',
firstBatch: [{ _id: 1, name: 'test' }]
},
ok: 1
});
}
});
});

it('sets the session on the cursor', function (done) {
const topology = new Topology(test.server.hostAddress());
const cursor = new AggregationCursor(
topology,
MongoDBNamespace.fromString('test.test'),
[],
{}
);
topology.connect(function () {
cursor.next(function () {
expect(cursor.session).to.exist;
topology.close(done);
});
});
});
});

context('when there is no data bearing server', function () {
beforeEach(function () {
test.server.setMessageHandler(request => {
const doc = request.document;
if (isHello(doc)) {
request.reply({ errmsg: 'network error' });
} else if (doc.aggregate) {
request.reply({
cursor: {
id: Long.fromNumber(1),
ns: 'test.test',
firstBatch: [{ _id: 1, name: 'test' }]
},
ok: 1
});
}
});
});

it('does not set the session on the cursor', function (done) {
const topology = new Topology(test.server.hostAddress(), {
serverSelectionTimeoutMS: 1000
});
const cursor = new AggregationCursor(
topology,
MongoDBNamespace.fromString('test.test'),
[],
{}
);
topology.connect(function () {
cursor.next(function () {
expect(cursor.session).to.not.exist;
topology.close(done);
});
});
});
});

context('when a data bearing server becomes available', function () {
beforeEach(function () {
// Set the count of times hello has been called.
let helloCalls = 0;
test.server.setMessageHandler(request => {
const doc = request.document;
if (isHello(doc)) {
// After the first hello call errors indicating no data bearing server is
// available, any subsequent hello call should succeed after server selection.
// This gives us a data bearing server available for the next call.
request.reply(helloCalls > 0 ? mock.HELLO : { errmsg: 'network error' });
helloCalls++;
} else if (doc.aggregate) {
request.reply({
cursor: {
id: Long.fromNumber(1),
ns: 'test.test',
firstBatch: [{ _id: 1, name: 'test' }]
},
ok: 1
});
}
});
});

it('sets the session on the cursor', function (done) {
const topology = new Topology(test.server.hostAddress(), {
serverSelectionTimeoutMS: 1000
});
const cursor = new AggregationCursor(
topology,
MongoDBNamespace.fromString('test.test'),
[],
{}
);
topology.connect(function () {
cursor.next(function () {
expect(cursor.session).to.exist;
topology.close(done);
});
});
});
});
});
});
112 changes: 112 additions & 0 deletions test/unit/cursor/find_cursor.test.js
Expand Up @@ -10,6 +10,118 @@ const { FindCursor } = require('../../../src/cursor/find_cursor');

const test = {};
describe('Find Cursor', function () {
describe('#next', function () {
afterEach(function () {
mock.cleanup();
});
beforeEach(async function () {
test.server = await mock.createServer();
});

context('when there is a data bearing server', function () {
beforeEach(function () {
test.server.setMessageHandler(request => {
const doc = request.document;
if (isHello(doc)) {
request.reply(mock.HELLO);
} else if (doc.find) {
request.reply({
cursor: {
id: Long.fromNumber(1),
ns: 'test.test',
firstBatch: [{ _id: 1, name: 'test' }]
},
ok: 1
});
}
});
});

it('sets the session on the cursor', function (done) {
const topology = new Topology(test.server.hostAddress());
const cursor = new FindCursor(topology, MongoDBNamespace.fromString('test.test'), {}, {});
topology.connect(function () {
cursor.next(function () {
expect(cursor.session).to.exist;
topology.close(done);
});
});
});
});

context('when there is no data bearing server', function () {
beforeEach(function () {
test.server.setMessageHandler(request => {
const doc = request.document;
if (isHello(doc)) {
request.reply({ errmsg: 'network error' });
} else if (doc.find) {
request.reply({
cursor: {
id: Long.fromNumber(1),
ns: 'test.test',
firstBatch: [{ _id: 1, name: 'test' }]
},
ok: 1
});
}
});
});

it('does not set the session on the cursor', function (done) {
const topology = new Topology(test.server.hostAddress(), {
serverSelectionTimeoutMS: 1000
});
const cursor = new FindCursor(topology, MongoDBNamespace.fromString('test.test'), {}, {});
topology.connect(function () {
cursor.next(function () {
expect(cursor.session).to.not.exist;
topology.close(done);
});
});
});
});

context('when a data bearing server becomes available', function () {
beforeEach(function () {
// Set the count of times hello has been called.
let helloCalls = 0;
test.server.setMessageHandler(request => {
const doc = request.document;
if (isHello(doc)) {
// After the first hello call errors indicating no data bearing server is
// available, any subsequent hello call should succeed after server selection.
// This gives us a data bearing server available for the next call.
request.reply(helloCalls > 0 ? mock.HELLO : { errmsg: 'network error' });
helloCalls++;
} else if (doc.find) {
request.reply({
cursor: {
id: Long.fromNumber(1),
ns: 'test.test',
firstBatch: [{ _id: 1, name: 'test' }]
},
ok: 1
});
}
});
});

it('sets the session on the cursor', function (done) {
const topology = new Topology(test.server.hostAddress(), {
serverSelectionTimeoutMS: 1000
});
const cursor = new FindCursor(topology, MongoDBNamespace.fromString('test.test'), {}, {});
topology.connect(function () {
cursor.next(function () {
expect(cursor.session).to.exist;
topology.close(done);
});
});
});
});
});

describe('Response', function () {
afterEach(() => mock.cleanup());
beforeEach(() => {
Expand Down

0 comments on commit aaa453d

Please sign in to comment.