diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index d729dc38a8..c4969abc27 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -653,8 +653,15 @@ function next(cursor: AbstractCursor, blocking: boolean, callback: Callback { + if (err) return callback(err); + return next(cursor, blocking, callback); + }); + } else if (cursor[kTopology].hasSessionSupport()) { + cursor[kSession] = cursor[kTopology].startSession({ owner: cursor, explicit: false }); + } } cursor._initialize(cursor[kSession], (err, state) => { diff --git a/src/cursor/aggregation_cursor.ts b/src/cursor/aggregation_cursor.ts index 281e2f0cb6..45353c114d 100644 --- a/src/cursor/aggregation_cursor.ts +++ b/src/cursor/aggregation_cursor.ts @@ -61,7 +61,7 @@ export class AggregationCursor extends AbstractCursor): void { + _initialize(session: ClientSession, callback: Callback): void { const aggregateOperation = new AggregateOperation(this.namespace, this[kPipeline], { ...this[kOptions], ...this.cursorOptions, diff --git a/src/cursor/find_cursor.ts b/src/cursor/find_cursor.ts index 154adee342..cab4ee6b20 100644 --- a/src/cursor/find_cursor.ts +++ b/src/cursor/find_cursor.ts @@ -68,7 +68,7 @@ export class FindCursor extends AbstractCursor { } /** @internal */ - _initialize(session: ClientSession | undefined, callback: Callback): void { + _initialize(session: ClientSession, callback: Callback): void { const findOperation = new FindOperation(undefined, this.namespace, this[kFilter], { ...this[kBuiltOptions], // NOTE: order matters here, we may need to refine this ...this.cursorOptions, diff --git a/test/unit/cursor/aggregation_cursor.test.js b/test/unit/cursor/aggregation_cursor.test.js new file mode 100644 index 0000000000..65a654dc09 --- /dev/null +++ b/test/unit/cursor/aggregation_cursor.test.js @@ -0,0 +1,138 @@ +'use strict'; + +const { expect } = require('chai'); +const mock = require('../../tools/mongodb-mock/index'); +const { Topology } = require('../../../src/sdam/topology'); +const { Long } = require('bson'); +const { MongoDBNamespace, isHello } = require('../../../src/utils'); +const { AggregationCursor } = require('../../../src/cursor/aggregation_cursor'); + +const test = {}; +describe('Aggregation Cursor', function () { + describe('#next', function () { + afterEach(function () { + mock.cleanup(); + }); + beforeEach(async function () { + test.server = await mock.createServer(); + }); + + context('when there is a data bearing server', function () { + beforeEach(function () { + test.server.setMessageHandler(request => { + const doc = request.document; + if (isHello(doc)) { + request.reply(mock.HELLO); + } else if (doc.aggregate) { + request.reply({ + cursor: { + id: Long.fromNumber(1), + ns: 'test.test', + firstBatch: [{ _id: 1, name: 'test' }] + }, + ok: 1 + }); + } + }); + }); + + it('sets the session on the cursor', function (done) { + const topology = new Topology(test.server.hostAddress()); + const cursor = new AggregationCursor( + topology, + MongoDBNamespace.fromString('test.test'), + [], + {} + ); + topology.connect(function () { + cursor.next(function () { + expect(cursor.session).to.exist; + topology.close(done); + }); + }); + }); + }); + + context('when there is no data bearing server', function () { + beforeEach(function () { + test.server.setMessageHandler(request => { + const doc = request.document; + if (isHello(doc)) { + request.reply({ errmsg: 'network error' }); + } else if (doc.aggregate) { + request.reply({ + cursor: { + id: Long.fromNumber(1), + ns: 'test.test', + firstBatch: [{ _id: 1, name: 'test' }] + }, + ok: 1 + }); + } + }); + }); + + it('does not set the session on the cursor', function (done) { + const topology = new Topology(test.server.hostAddress(), { + serverSelectionTimeoutMS: 1000 + }); + const cursor = new AggregationCursor( + topology, + MongoDBNamespace.fromString('test.test'), + [], + {} + ); + topology.connect(function () { + cursor.next(function () { + expect(cursor.session).to.not.exist; + topology.close(done); + }); + }); + }); + }); + + context('when a data bearing server becomes available', function () { + beforeEach(function () { + // Set the count of times hello has been called. + let helloCalls = 0; + test.server.setMessageHandler(request => { + const doc = request.document; + if (isHello(doc)) { + // After the first hello call errors indicating no data bearing server is + // available, any subsequent hello call should succeed after server selection. + // This gives us a data bearing server available for the next call. + request.reply(helloCalls > 0 ? mock.HELLO : { errmsg: 'network error' }); + helloCalls++; + } else if (doc.aggregate) { + request.reply({ + cursor: { + id: Long.fromNumber(1), + ns: 'test.test', + firstBatch: [{ _id: 1, name: 'test' }] + }, + ok: 1 + }); + } + }); + }); + + it('sets the session on the cursor', function (done) { + const topology = new Topology(test.server.hostAddress(), { + serverSelectionTimeoutMS: 1000 + }); + const cursor = new AggregationCursor( + topology, + MongoDBNamespace.fromString('test.test'), + [], + {} + ); + topology.connect(function () { + cursor.next(function () { + expect(cursor.session).to.exist; + topology.close(done); + }); + }); + }); + }); + }); +}); diff --git a/test/unit/cursor/find_cursor.test.js b/test/unit/cursor/find_cursor.test.js index 1aa8165640..4e4bb8d54b 100644 --- a/test/unit/cursor/find_cursor.test.js +++ b/test/unit/cursor/find_cursor.test.js @@ -10,6 +10,118 @@ const { FindCursor } = require('../../../src/cursor/find_cursor'); const test = {}; describe('Find Cursor', function () { + describe('#next', function () { + afterEach(function () { + mock.cleanup(); + }); + beforeEach(async function () { + test.server = await mock.createServer(); + }); + + context('when there is a data bearing server', function () { + beforeEach(function () { + test.server.setMessageHandler(request => { + const doc = request.document; + if (isHello(doc)) { + request.reply(mock.HELLO); + } else if (doc.find) { + request.reply({ + cursor: { + id: Long.fromNumber(1), + ns: 'test.test', + firstBatch: [{ _id: 1, name: 'test' }] + }, + ok: 1 + }); + } + }); + }); + + it('sets the session on the cursor', function (done) { + const topology = new Topology(test.server.hostAddress()); + const cursor = new FindCursor(topology, MongoDBNamespace.fromString('test.test'), {}, {}); + topology.connect(function () { + cursor.next(function () { + expect(cursor.session).to.exist; + topology.close(done); + }); + }); + }); + }); + + context('when there is no data bearing server', function () { + beforeEach(function () { + test.server.setMessageHandler(request => { + const doc = request.document; + if (isHello(doc)) { + request.reply({ errmsg: 'network error' }); + } else if (doc.find) { + request.reply({ + cursor: { + id: Long.fromNumber(1), + ns: 'test.test', + firstBatch: [{ _id: 1, name: 'test' }] + }, + ok: 1 + }); + } + }); + }); + + it('does not set the session on the cursor', function (done) { + const topology = new Topology(test.server.hostAddress(), { + serverSelectionTimeoutMS: 1000 + }); + const cursor = new FindCursor(topology, MongoDBNamespace.fromString('test.test'), {}, {}); + topology.connect(function () { + cursor.next(function () { + expect(cursor.session).to.not.exist; + topology.close(done); + }); + }); + }); + }); + + context('when a data bearing server becomes available', function () { + beforeEach(function () { + // Set the count of times hello has been called. + let helloCalls = 0; + test.server.setMessageHandler(request => { + const doc = request.document; + if (isHello(doc)) { + // After the first hello call errors indicating no data bearing server is + // available, any subsequent hello call should succeed after server selection. + // This gives us a data bearing server available for the next call. + request.reply(helloCalls > 0 ? mock.HELLO : { errmsg: 'network error' }); + helloCalls++; + } else if (doc.find) { + request.reply({ + cursor: { + id: Long.fromNumber(1), + ns: 'test.test', + firstBatch: [{ _id: 1, name: 'test' }] + }, + ok: 1 + }); + } + }); + }); + + it('sets the session on the cursor', function (done) { + const topology = new Topology(test.server.hostAddress(), { + serverSelectionTimeoutMS: 1000 + }); + const cursor = new FindCursor(topology, MongoDBNamespace.fromString('test.test'), {}, {}); + topology.connect(function () { + cursor.next(function () { + expect(cursor.session).to.exist; + topology.close(done); + }); + }); + }); + }); + }); + describe('Response', function () { afterEach(() => mock.cleanup()); beforeEach(() => {