Skip to content

Commit

Permalink
fix(NODE-4413): set maxTimeMS on getMores when maxAwaitTimeMS is spec…
Browse files Browse the repository at this point in the history
…ified (#3319)
  • Loading branch information
nbbeeken committed Aug 10, 2022
1 parent bc70022 commit dcbfd6e
Show file tree
Hide file tree
Showing 6 changed files with 426 additions and 144 deletions.
47 changes: 31 additions & 16 deletions src/cursor/abstract_cursor.ts
Expand Up @@ -78,8 +78,20 @@ export interface AbstractCursorOptions extends BSONSerializeOptions {
session?: ClientSession;
readPreference?: ReadPreferenceLike;
readConcern?: ReadConcernLike;
/**
* Specifies the number of documents to return in each response from MongoDB
*/
batchSize?: number;
/**
* When applicable `maxTimeMS` controls the amount of time the initial command
* that constructs a cursor should take. (ex. find, aggregate, listCollections)
*/
maxTimeMS?: number;
/**
* When applicable `maxAwaitTimeMS` controls the amount of time subsequent getMores
* that a cursor uses to fetch more data should take. (ex. cursor.next())
*/
maxAwaitTimeMS?: number;
/**
* Comment to apply to the operation.
*
Expand All @@ -89,7 +101,19 @@ export interface AbstractCursorOptions extends BSONSerializeOptions {
* In server versions 4.4 and above, 'comment' can be any valid BSON type.
*/
comment?: unknown;
/**
* By default, MongoDB will automatically close a cursor when the
* client has exhausted all results in the cursor. However, for [capped collections](https://www.mongodb.com/docs/manual/core/capped-collections)
* you may use a Tailable Cursor that remains open after the client exhausts
* the results in the initial cursor.
*/
tailable?: boolean;
/**
* If awaitData is set to true, when the cursor reaches the end of the capped collection,
* MongoDB blocks the query thread for a period of time waiting for new data to arrive.
* When new data is inserted into the capped collection, the blocked thread is signaled
* to wake up and return the next batch to the client.
*/
awaitData?: boolean;
noCursorTimeout?: boolean;
}
Expand Down Expand Up @@ -155,7 +179,7 @@ export abstract class AbstractCursor<
}
this[kClient] = client;
this[kNamespace] = namespace;
this[kDocuments] = []; // TODO: https://github.com/microsoft/TypeScript/issues/36230
this[kDocuments] = [];
this[kInitialized] = false;
this[kClosed] = false;
this[kKilled] = false;
Expand Down Expand Up @@ -186,6 +210,10 @@ export abstract class AbstractCursor<
this[kOptions].maxTimeMS = options.maxTimeMS;
}

if (typeof options.maxAwaitTimeMS === 'number') {
this[kOptions].maxAwaitTimeMS = options.maxAwaitTimeMS;
}

if (options.session instanceof ClientSession) {
this[kSession] = options.session;
} else {
Expand Down Expand Up @@ -617,21 +645,8 @@ export abstract class AbstractCursor<

/** @internal */
_getMore(batchSize: number, callback: Callback<Document>): void {
const cursorId = this[kId];
const cursorNs = this[kNamespace];
const server = this[kServer];

if (cursorId == null) {
callback(new MongoRuntimeError('Unable to iterate cursor with no id'));
return;
}

if (server == null) {
callback(new MongoRuntimeError('Unable to iterate cursor without selected server'));
return;
}

const getMoreOperation = new GetMoreOperation(cursorNs, cursorId, server, {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const getMoreOperation = new GetMoreOperation(this[kNamespace], this[kId]!, this[kServer]!, {
...this[kOptions],
session: this[kSession],
batchSize
Expand Down
6 changes: 5 additions & 1 deletion src/operations/get_more.ts
Expand Up @@ -39,7 +39,7 @@ export class GetMoreOperation extends AbstractOperation {
cursorId: Long;
override options: GetMoreOptions;

constructor(ns: MongoDBNamespace, cursorId: Long, server: Server, options: GetMoreOptions = {}) {
constructor(ns: MongoDBNamespace, cursorId: Long, server: Server, options: GetMoreOptions) {
super(options);

this.options = options;
Expand All @@ -63,6 +63,10 @@ export class GetMoreOperation extends AbstractOperation {
);
}

if (this.cursorId == null || this.cursorId.isZero()) {
return callback(new MongoRuntimeError('Unable to iterate cursor with no id'));
}

const collection = this.ns.collection;
if (collection == null) {
// Cursors should have adopted the namespace returned by MongoDB
Expand Down
130 changes: 123 additions & 7 deletions test/integration/change-streams/change_stream.test.ts
Expand Up @@ -1111,7 +1111,7 @@ describe('Change Streams', function () {
changeStream.next((err, doc) => {
expect(err).to.exist;
expect(doc).to.not.exist;
expect(err.message).to.equal('ChangeStream is closed');
expect(err?.message).to.equal('ChangeStream is closed');
changeStream.close(() => client.close(done));
});
});
Expand Down Expand Up @@ -1372,23 +1372,139 @@ describe('Change Streams', function () {
)
.run();

UnifiedTestSuiteBuilder.describe('entity.watch() server-side options')
.runOnRequirement({
topologies: ['replicaset', 'sharded-replicaset', 'sharded', 'load-balanced'],
minServerVersion: '4.4.0'
})
.createEntities([
{ client: { id: 'client0', observeEvents: ['commandStartedEvent'] } },
{ database: { id: 'db0', client: 'client0', databaseName: 'watchOpts' } },
{ collection: { id: 'collection0', database: 'db0', collectionName: 'watchOpts' } }
])
.test(
TestBuilder.it(
'should use maxAwaitTimeMS option to set maxTimeMS on getMore and should not set maxTimeMS on aggregate'
)
.operation({
object: 'collection0',
name: 'createChangeStream',
saveResultAsEntity: 'changeStreamOnClient',
arguments: { maxAwaitTimeMS: 5000 }
})
.operation({
name: 'insertOne',
object: 'collection0',
arguments: { document: { a: 1 } },
ignoreResultAndError: true
})
.operation({
object: 'changeStreamOnClient',
name: 'iterateUntilDocumentOrError',
ignoreResultAndError: true
})
.expectEvents({
client: 'client0',
events: [
{
commandStartedEvent: {
commandName: 'aggregate',
command: { maxTimeMS: { $$exists: false } }
}
},
{ commandStartedEvent: { commandName: 'insert' } },
{ commandStartedEvent: { commandName: 'getMore', command: { maxTimeMS: 5000 } } }
]
})
.toJSON()
)
.test(
TestBuilder.it(
'should use maxTimeMS option to set maxTimeMS on aggregate and not set maxTimeMS on getMore'
)
.operation({
object: 'collection0',
name: 'createChangeStream',
saveResultAsEntity: 'changeStreamOnClient',
arguments: { maxTimeMS: 5000 }
})
.operation({
name: 'insertOne',
object: 'collection0',
arguments: { document: { a: 1 } },
ignoreResultAndError: true
})
.operation({
object: 'changeStreamOnClient',
name: 'iterateUntilDocumentOrError',
ignoreResultAndError: true
})
.expectEvents({
client: 'client0',
ignoreExtraEvents: true, // Sharded clusters have extra getMores
events: [
{ commandStartedEvent: { commandName: 'aggregate', command: { maxTimeMS: 5000 } } },
{ commandStartedEvent: { commandName: 'insert' } },
{
commandStartedEvent: {
commandName: 'getMore',
command: { maxTimeMS: { $$exists: false } }
}
}
]
})
.toJSON()
)
.test(
TestBuilder.it(
'should use maxTimeMS option to set maxTimeMS on aggregate and maxAwaitTimeMS option to set maxTimeMS on getMore'
)
.operation({
object: 'collection0',
name: 'createChangeStream',
saveResultAsEntity: 'changeStreamOnClient',
arguments: { maxTimeMS: 5000, maxAwaitTimeMS: 6000 }
})
.operation({
name: 'insertOne',
object: 'collection0',
arguments: { document: { a: 1 } },
ignoreResultAndError: true
})
.operation({
object: 'changeStreamOnClient',
name: 'iterateUntilDocumentOrError',
ignoreResultAndError: true
})
.expectEvents({
client: 'client0',
ignoreExtraEvents: true, // Sharded clusters have extra getMores
events: [
{ commandStartedEvent: { commandName: 'aggregate', command: { maxTimeMS: 5000 } } },
{ commandStartedEvent: { commandName: 'insert' } },
{ commandStartedEvent: { commandName: 'getMore', command: { maxTimeMS: 6000 } } }
]
})
.toJSON()
)
.run();

describe('BSON Options', function () {
let client: MongoClient;
let db: Db;
let collection: Collection;
let cs: ChangeStream;

beforeEach(async function () {
client = await this.configuration.newClient({ monitorCommands: true }).connect();
db = client.db('db');
collection = await db.createCollection('collection');
});

afterEach(async function () {
await db.dropCollection('collection');
await cs.close();
await client.close();
client = undefined;
db = undefined;
collection = undefined;
});

context('promoteLongs', () => {
Expand Down Expand Up @@ -1452,7 +1568,7 @@ describe('Change Streams', function () {
it('does not send invalid options on the aggregate command', {
metadata: { requires: { topology: '!single' } },
test: async function () {
const started = [];
const started: CommandStartedEvent[] = [];

client.on('commandStarted', filterForCommands(['aggregate'], started));
const doc = { invalidBSONOption: true };
Expand All @@ -1473,7 +1589,7 @@ describe('Change Streams', function () {
it('does not send invalid options on the getMore command', {
metadata: { requires: { topology: '!single' } },
test: async function () {
const started = [];
const started: CommandStartedEvent[] = [];

client.on('commandStarted', filterForCommands(['aggregate'], started));
const doc = { invalidBSONOption: true };
Expand Down Expand Up @@ -1503,7 +1619,7 @@ describe('ChangeStream resumability', function () {
const changeStreamResumeOptions: ChangeStreamOptions = {
fullDocument: 'updateLookup',
collation: { locale: 'en', maxVariable: 'punct' },
maxAwaitTimeMS: 20000,
maxAwaitTimeMS: 2000,
batchSize: 200
};

Expand Down

0 comments on commit dcbfd6e

Please sign in to comment.