From 47adfb3db6e38e153e8e78f2b6e4463754775cc6 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Tue, 31 May 2022 15:19:07 -0400 Subject: [PATCH] feat(NODE-4081): fix and deprecate change stream resume options (#3270) --- src/change_stream.ts | 49 +++-- test/unit/change_stream.test.ts | 304 ++++++++++++++++++++++++++++++++ 2 files changed, 328 insertions(+), 25 deletions(-) create mode 100644 test/unit/change_stream.test.ts diff --git a/src/change_stream.ts b/src/change_stream.ts index bb9c083eb7..ca846167ec 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -54,15 +54,6 @@ const CHANGE_STREAM_OPTIONS = [ 'fullDocumentBeforeChange' ] as const; -const CURSOR_OPTIONS = [ - 'batchSize', - 'maxAwaitTimeMS', - 'collation', - 'readPreference', - 'comment', - ...CHANGE_STREAM_OPTIONS -] as const; - const CHANGE_DOMAIN_TYPES = { COLLECTION: Symbol('Collection'), DATABASE: Symbol('Database'), @@ -84,7 +75,10 @@ const NO_RESUME_TOKEN_ERROR = const NO_CURSOR_ERROR = 'ChangeStream has no cursor'; const CHANGESTREAM_CLOSED_ERROR = 'ChangeStream is closed'; -/** @public */ +/** + * @public + * @deprecated Please use the ChangeStreamCursorOptions type instead. + */ export interface ResumeOptions { startAtOperationTime?: Timestamp; batchSize?: number; @@ -93,6 +87,7 @@ export interface ResumeOptions { readPreference?: ReadPreference; resumeAfter?: ResumeToken; startAfter?: ResumeToken; + fullDocument?: string; } /** @@ -639,7 +634,7 @@ export class ChangeStream< * @internal */ private _createChangeStreamCursor( - options: ChangeStreamOptions | ResumeOptions + options: ChangeStreamOptions | ChangeStreamCursorOptions ): ChangeStreamCursor { const changeStreamStageOptions = filterOptions(options, CHANGE_STREAM_OPTIONS); if (this.type === CHANGE_DOMAIN_TYPES.CLUSTER) { @@ -880,6 +875,9 @@ export interface ChangeStreamCursorOptions extends AbstractCursorOptions { startAtOperationTime?: OperationTime; resumeAfter?: ResumeToken; startAfter?: ResumeToken; + maxAwaitTimeMS?: number; + collation?: CollationOptions; + fullDocument?: string; } /** @internal */ @@ -926,25 +924,26 @@ export class ChangeStreamCursor< return this._resumeToken; } - get resumeOptions(): ResumeOptions { - const result: ResumeOptions = filterOptions(this.options, CURSOR_OPTIONS); - - if (this.resumeToken || this.startAtOperationTime) { - for (const key of ['resumeAfter', 'startAfter', 'startAtOperationTime']) { - Reflect.deleteProperty(result, key); - } + get resumeOptions(): ChangeStreamCursorOptions { + const options: ChangeStreamCursorOptions = { + ...this.options + }; - if (this.resumeToken) { - const resumeKey = - this.options.startAfter && !this.hasReceived ? 'startAfter' : 'resumeAfter'; + for (const key of ['resumeAfter', 'startAfter', 'startAtOperationTime'] as const) { + delete options[key]; + } - result[resumeKey] = this.resumeToken; - } else if (this.startAtOperationTime && maxWireVersion(this.server) >= 7) { - result.startAtOperationTime = this.startAtOperationTime; + if (this.resumeToken != null) { + if (this.options.startAfter && !this.hasReceived) { + options.startAfter = this.resumeToken; + } else { + options.resumeAfter = this.resumeToken; } + } else if (this.startAtOperationTime != null && maxWireVersion(this.server) >= 7) { + options.startAtOperationTime = this.startAtOperationTime; } - return result; + return options; } cacheResumeToken(resumeToken: ResumeToken): void { diff --git a/test/unit/change_stream.test.ts b/test/unit/change_stream.test.ts new file mode 100644 index 0000000000..8c0be4b226 --- /dev/null +++ b/test/unit/change_stream.test.ts @@ -0,0 +1,304 @@ +import { Long, Timestamp } from 'bson'; +import { expect } from 'chai'; +import * as sinon from 'sinon'; + +import { ChangeStreamCursor } from '../../src/change_stream'; +import { MongoClient } from '../../src/mongo_client'; +import { MongoDBNamespace } from '../../src/utils'; + +describe('ChangeStreamCursor', function () { + afterEach(function () { + sinon.restore(); + }); + + describe('get resumeOptions()', function () { + context('when there is a cached resumeToken', function () { + it('copies all non-resume related options from the original cursor', function () { + const cursor = new ChangeStreamCursor( + new MongoClient('mongodb://localhost:27027'), + new MongoDBNamespace('db', 'collection'), + [], + { promoteBuffers: true, promoteLongs: false, maxAwaitTimeMS: 5000 } + ); + cursor.resumeToken = 'resume token'; + + const options = cursor.resumeOptions; + expect(options).to.haveOwnProperty('promoteBuffers', true); + expect(options).to.haveOwnProperty('promoteLongs', false); + expect(options).to.haveOwnProperty('maxAwaitTimeMS', 5000); + }); + + context('when the cursor was started with startAfter', function () { + let cursor: ChangeStreamCursor; + + beforeEach(function () { + cursor = new ChangeStreamCursor( + new MongoClient('mongodb://localhost:27027'), + new MongoDBNamespace('db', 'collection'), + [], + { startAfter: 'start after' } + ); + cursor.resumeToken = 'resume token'; + }); + + context('when the cursor has not yet returned a document', function () { + beforeEach(function () { + cursor.hasReceived = false; + }); + + it('sets the startAfter option to the cached resumeToken', function () { + expect(cursor.resumeOptions).to.haveOwnProperty('startAfter', 'resume token'); + }); + + it('does NOT set the resumeAfter option', function () { + expect(cursor.resumeOptions).not.to.haveOwnProperty('resumeAfter'); + }); + + context('when the startAtOperationTime option is NOT set', function () { + it('does NOT set the startAtOperationTime option', function () { + expect(cursor.resumeOptions).not.to.haveOwnProperty('startAtOperationTime'); + }); + }); + + context('when the startAtOperationTime option is set', function () { + it('does NOT set the startAtOperationTime option', function () { + cursor.startAtOperationTime = new Timestamp(Long.ZERO); + expect(cursor.resumeOptions).not.to.haveOwnProperty('startAtOperationTime'); + }); + }); + }); + + context('when the cursor has returned a document', function () { + beforeEach(function () { + cursor.hasReceived = true; + }); + + it('does NOT set the startAfter option', function () { + expect(cursor.resumeOptions).not.to.haveOwnProperty('startAfter'); + }); + + it('sets the resumeAFter option to the cached resumeToken', function () { + expect(cursor.resumeOptions).to.haveOwnProperty('resumeAfter', 'resume token'); + }); + + context('when the startAtOperationTime option is NOT set', function () { + it('does NOT set the startAtOperationTime option', function () { + expect(cursor.resumeOptions).not.to.haveOwnProperty('startAtOperationTime'); + }); + }); + + context('when the startAtOperationTime option is set', function () { + it('does NOT set the startAtOperationTime option', function () { + cursor.startAtOperationTime = new Timestamp(Long.ZERO); + expect(cursor.resumeOptions).not.to.haveOwnProperty('startAtOperationTime'); + }); + }); + }); + }); + + context('when the cursor was not initialized with startAfter set', function () { + let cursor: ChangeStreamCursor; + beforeEach(function () { + cursor = new ChangeStreamCursor( + new MongoClient('mongodb://localhost:27027'), + new MongoDBNamespace('db', 'collection'), + [], + {} + ); + cursor.resumeToken = 'resume token'; + }); + + it('sets the resumeAfter option to the cached resumeToken', function () { + expect(cursor.resumeOptions).to.haveOwnProperty('resumeAfter', 'resume token'); + }); + + it('does NOT set the startAfter option', function () { + expect(cursor.resumeOptions).not.to.haveOwnProperty('startAfter'); + }); + + context('when the startAtOperationTime option is NOT set', function () { + it('does NOT set the startAtOperationTime option', function () { + expect(cursor.resumeOptions).not.to.haveOwnProperty('startAtOperationTime'); + }); + }); + + context('when the startAtOperationTime option is set', function () { + it('does NOT set the startAtOperationTime option', function () { + cursor.startAtOperationTime = new Timestamp(Long.ZERO); + cursor.resumeToken = 'resume token'; + + expect(cursor.resumeOptions).not.to.haveOwnProperty('startAtOperationTime'); + }); + }); + }); + }); + + context('when there is no cached resumeToken', function () { + context('when the cursor has a saved operation time', function () { + it('copies all non-resume related options from the original cursor', function () { + const cursor = new ChangeStreamCursor( + new MongoClient('mongodb://localhost:27027'), + new MongoDBNamespace('db', 'collection'), + [], + { + startAfter: 'start after', + resumeAfter: 'resume after', + startAtOperationTime: new Timestamp(Long.ZERO), + promoteBuffers: true, + promoteLongs: false, + maxAwaitTimeMS: 5000 + } + ); + cursor.resumeToken = null; + + const options = cursor.resumeOptions; + expect(options).to.haveOwnProperty('promoteBuffers', true); + expect(options).to.haveOwnProperty('promoteLongs', false); + expect(options).to.haveOwnProperty('maxAwaitTimeMS', 5000); + }); + + context('when the maxWireVersion >= 7', function () { + let cursor: ChangeStreamCursor; + beforeEach(function () { + cursor = new ChangeStreamCursor( + new MongoClient('mongodb://localhost:27027'), + new MongoDBNamespace('db', 'collection'), + [], + { + startAfter: 'start after', + resumeAfter: 'resume after', + startAtOperationTime: new Timestamp(Long.ZERO) + } + ); + cursor.resumeToken = null; + sinon.stub(cursor, 'server').get(() => ({ hello: { maxWireVersion: 7 } })); + }); + + it('does NOT set the resumeAfter option', function () { + expect(cursor.resumeOptions).not.to.haveOwnProperty('resumeAfter'); + }); + + it('does NOT set the startAfter option', function () { + expect(cursor.resumeOptions).not.to.haveOwnProperty('startAfter'); + }); + + it('does set the startAtOperationTime option', function () { + expect(cursor.resumeOptions).to.haveOwnProperty('startAtOperationTime'); + }); + }); + + context('when the maxWireVersion < 7', function () { + let cursor: ChangeStreamCursor; + beforeEach(function () { + cursor = new ChangeStreamCursor( + new MongoClient('mongodb://localhost:27027'), + new MongoDBNamespace('db', 'collection'), + [], + { + startAfter: 'start after', + resumeAfter: 'resume after', + startAtOperationTime: new Timestamp(Long.ZERO) + } + ); + cursor.resumeToken = null; + sinon.stub(cursor, 'server').get(() => ({ hello: { maxWireVersion: 6 } })); + }); + + it('does NOT set the resumeAfter option', function () { + expect(cursor.resumeOptions).not.to.haveOwnProperty('resumeAfter'); + }); + + it('does NOT set the startAfter option', function () { + expect(cursor.resumeOptions).not.to.haveOwnProperty('startAfter'); + }); + + it('does NOT set the startAtOperationTime option', function () { + expect(cursor.resumeOptions).not.to.haveOwnProperty('startAtOperationTime'); + }); + }); + }); + + context('when the cursor does NOT have a saved operation time', function () { + it('copies all non-resume related options from the original cursor', function () { + const cursor = new ChangeStreamCursor( + new MongoClient('mongodb://localhost:27027'), + new MongoDBNamespace('db', 'collection'), + [], + { + startAfter: 'start after', + resumeAfter: 'resume after', + promoteBuffers: true, + promoteLongs: false, + maxAwaitTimeMS: 5000 + } + ); + cursor.resumeToken = null; + + const options = cursor.resumeOptions; + expect(options).to.haveOwnProperty('promoteBuffers', true); + expect(options).to.haveOwnProperty('promoteLongs', false); + expect(options).to.haveOwnProperty('maxAwaitTimeMS', 5000); + }); + + context('when the maxWireVersion >= 7', function () { + let cursor: ChangeStreamCursor; + beforeEach(function () { + cursor = new ChangeStreamCursor( + new MongoClient('mongodb://localhost:27027'), + new MongoDBNamespace('db', 'collection'), + [], + { + startAfter: 'start after', + resumeAfter: 'resume after' + } + ); + cursor.resumeToken = null; + sinon.stub(cursor, 'server').get(() => ({ hello: { maxWireVersion: 7 } })); + }); + + it('does NOT set the resumeAfter option', function () { + expect(cursor.resumeOptions).not.to.haveOwnProperty('resumeAfter'); + }); + + it('does NOT set the startAfter option', function () { + expect(cursor.resumeOptions).not.to.haveOwnProperty('startAfter'); + }); + + it('does NOT set the startAtOperationTime option', function () { + expect(cursor.resumeOptions).not.to.haveOwnProperty('startAtOperationTime'); + }); + }); + + context('when the maxWireVersion < 7', function () { + let cursor: ChangeStreamCursor; + beforeEach(function () { + cursor = new ChangeStreamCursor( + new MongoClient('mongodb://localhost:27027'), + new MongoDBNamespace('db', 'collection'), + [], + { + startAfter: 'start after', + resumeAfter: 'resume after', + startAtOperationTime: new Timestamp(Long.ZERO) + } + ); + cursor.resumeToken = null; + sinon.stub(cursor, 'server').get(() => ({ hello: { maxWireVersion: 6 } })); + }); + + it('does NOT set the resumeAfter option', function () { + expect(cursor.resumeOptions).not.to.haveOwnProperty('resumeAfter'); + }); + + it('does NOT set the startAfter option', function () { + expect(cursor.resumeOptions).not.to.haveOwnProperty('startAfter'); + }); + + it('does NOT set the startAtOperationTime option', function () { + expect(cursor.resumeOptions).not.to.haveOwnProperty('startAtOperationTime'); + }); + }); + }); + }); + }); +});