From 799689e9f831dce00db8b95799491a2f873b95cf Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Wed, 16 Mar 2022 16:01:53 -0400 Subject: [PATCH] fix(NODE-4069): remove 'default' from options for fullDocument field in change stream options (#3169) --- package.json | 1 + src/change_stream.ts | 63 ++++++++------- .../change-streams/change_stream.test.js | 80 +++++++++++++++++++ test/types/change_stream.test-d.ts | 9 +++ 4 files changed, 126 insertions(+), 27 deletions(-) create mode 100644 test/types/change_stream.test-d.ts diff --git a/package.json b/package.json index feab85f634..f167de6113 100644 --- a/package.json +++ b/package.json @@ -122,6 +122,7 @@ "check:socks5": "mocha --config test/manual/mocharc.json test/manual/socks5.test.ts", "check:csfle": "mocha --config test/mocha_mongodb.json test/integration/client-side-encryption", "check:snappy": "mocha test/unit/assorted/snappy.test.js", + "fix:eslint": "npm run check:eslint -- --fix", "prepare": "node etc/prepare.js", "preview:docs": "ts-node etc/docs/preview.ts", "release": "standard-version -i HISTORY.md", diff --git a/src/change_stream.ts b/src/change_stream.ts index 57ee4f7b5e..742065a480 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -45,10 +45,20 @@ const kClosed = Symbol('closed'); /** @internal */ const kMode = Symbol('mode'); -const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument']; -const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat( - CHANGE_STREAM_OPTIONS -); +const CHANGE_STREAM_OPTIONS = [ + 'resumeAfter', + 'startAfter', + 'startAtOperationTime', + 'fullDocument' +] as const; + +const CURSOR_OPTIONS = [ + 'batchSize', + 'maxAwaitTimeMS', + 'collation', + 'readPreference', + ...CHANGE_STREAM_OPTIONS +] as const; const CHANGE_DOMAIN_TYPES = { COLLECTION: Symbol('Collection'), @@ -68,6 +78,8 @@ export interface ResumeOptions { maxAwaitTimeMS?: number; collation?: CollationOptions; readPreference?: ReadPreference; + resumeAfter?: ResumeToken; + startAfter?: ResumeToken; } /** @@ -94,7 +106,7 @@ export interface PipeOptions { * @public */ export interface ChangeStreamOptions extends AggregateOptions { - /** Allowed values: ‘default’, ‘updateLookup’. When set to ‘updateLookup’, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred. */ + /** Allowed values: 'updateLookup'. When set to 'updateLookup', the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred. */ fullDocument?: string; /** The maximum amount of time for the server to wait on new documents to satisfy a change stream query. */ maxAwaitTimeMS?: number; @@ -446,22 +458,18 @@ export class ChangeStreamCursor extends Abs } get resumeOptions(): ResumeOptions { - const result = {} as ResumeOptions; - for (const optionName of CURSOR_OPTIONS) { - if (Reflect.has(this.options, optionName)) { - Reflect.set(result, optionName, Reflect.get(this.options, optionName)); - } - } + const result: ResumeOptions = applyKnownOptions(this.options, CURSOR_OPTIONS); if (this.resumeToken || this.startAtOperationTime) { - ['resumeAfter', 'startAfter', 'startAtOperationTime'].forEach(key => - Reflect.deleteProperty(result, key) - ); + for (const key of ['resumeAfter', 'startAfter', 'startAtOperationTime']) { + Reflect.deleteProperty(result, key); + } if (this.resumeToken) { const resumeKey = this.options.startAfter && !this.hasReceived ? 'startAfter' : 'resumeAfter'; - Reflect.set(result, resumeKey, this.resumeToken); + + result[resumeKey] = this.resumeToken; } else if (this.startAtOperationTime && maxWireVersion(this.server) >= 7) { result.startAtOperationTime = this.startAtOperationTime; } @@ -568,25 +576,25 @@ function setIsIterator(changeStream: ChangeStream): void { } changeStream[kMode] = 'iterator'; } + /** * Create a new change stream cursor based on self's configuration * @internal */ function createChangeStreamCursor( changeStream: ChangeStream, - options: ChangeStreamOptions + options: ChangeStreamOptions | ResumeOptions ): ChangeStreamCursor { - const changeStreamStageOptions: Document = { fullDocument: options.fullDocument || 'default' }; - applyKnownOptions(changeStreamStageOptions, options, CHANGE_STREAM_OPTIONS); + const changeStreamStageOptions = applyKnownOptions(options, CHANGE_STREAM_OPTIONS); if (changeStream.type === CHANGE_DOMAIN_TYPES.CLUSTER) { changeStreamStageOptions.allChangesForCluster = true; } - const pipeline = [{ $changeStream: changeStreamStageOptions } as Document].concat( changeStream.pipeline ); - const cursorOptions = applyKnownOptions({}, options, CURSOR_OPTIONS); + const cursorOptions: ChangeStreamCursorOptions = applyKnownOptions(options, CURSOR_OPTIONS); + const changeStreamCursor = new ChangeStreamCursor( getTopology(changeStream.parent), changeStream.namespace, @@ -605,16 +613,17 @@ function createChangeStreamCursor( return changeStreamCursor; } -function applyKnownOptions(target: Document, source: Document, optionNames: string[]) { - optionNames.forEach(name => { - if (source[name]) { - target[name] = source[name]; +function applyKnownOptions(source: Document, options: ReadonlyArray) { + const result: Document = {}; + + for (const option of options) { + if (source[option]) { + result[option] = source[option]; } - }); + } - return target; + return result; } - interface TopologyWaitOptions { start?: number; timeout?: number; diff --git a/test/integration/change-streams/change_stream.test.js b/test/integration/change-streams/change_stream.test.js index 892bc9e215..33e474647b 100644 --- a/test/integration/change-streams/change_stream.test.js +++ b/test/integration/change-streams/change_stream.test.js @@ -187,6 +187,86 @@ describe('Change Streams', function () { }); afterEach(async () => await mock.cleanup()); + context('ChangeStreamCursor options', function () { + let client, db, collection; + + beforeEach(async function () { + client = await this.configuration.newClient().connect(); + db = client.db('db'); + collection = db.collection('collection'); + }); + + afterEach(async function () { + await client.close(); + client = undefined; + db = undefined; + collection = undefined; + }); + + context('fullDocument', () => { + it('does not set fullDocument if no value is provided', function () { + const changeStream = client.watch(); + + expect(changeStream).not.to.have.nested.property( + 'cursor.pipeline[0].$changeStream.fullDocument' + ); + }); + + it('does not validate the value passed in for the `fullDocument` property', function () { + const changeStream = client.watch([], { fullDocument: 'invalid value' }); + + expect(changeStream).to.have.nested.property( + 'cursor.pipeline[0].$changeStream.fullDocument', + 'invalid value' + ); + }); + + it('assigns `fullDocument` to the correct value if it is passed as an option', function () { + const changeStream = client.watch([], { fullDocument: 'updateLookup' }); + + expect(changeStream).to.have.nested.property( + 'cursor.pipeline[0].$changeStream.fullDocument', + 'updateLookup' + ); + }); + }); + + context('allChangesForCluster', () => { + it('assigns `allChangesForCluster` to `true` if the ChangeStream.type is Cluster', function () { + const changeStream = client.watch(); + + expect(changeStream).to.have.nested.property( + 'cursor.pipeline[0].$changeStream.allChangesForCluster', + true + ); + }); + + it('does not assign `allChangesForCluster` if the ChangeStream.type is Db', function () { + const changeStream = db.watch(); + + expect(changeStream).not.to.have.nested.property( + 'cursor.pipeline[0].$changeStream.allChangesForCluster' + ); + }); + + it('does not assign `allChangesForCluster` if the ChangeStream.type is Collection', function () { + const changeStream = collection.watch(); + + expect(changeStream).not.to.have.nested.property( + 'cursor.pipeline[0].$changeStream.allChangesForCluster' + ); + }); + }); + + it('ignores any invalid option values', function () { + const changeStream = collection.watch([], { invalidOption: true }); + + expect(changeStream).not.to.have.nested.property( + 'cursor.pipeline[0].$changeStream.invalidOption' + ); + }); + }); + it('should close the listeners after the cursor is closed', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, diff --git a/test/types/change_stream.test-d.ts b/test/types/change_stream.test-d.ts new file mode 100644 index 0000000000..07efa9a378 --- /dev/null +++ b/test/types/change_stream.test-d.ts @@ -0,0 +1,9 @@ +import { expectType } from 'tsd'; + +import type { ChangeStreamOptions } from '../../src'; + +declare const changeStreamOptions: ChangeStreamOptions; + +// The change stream spec says that we cannot throw an error for invalid values to `fullDocument` +// for future compatability. This means we must leave `fullDocument` as type string. +expectType(changeStreamOptions.fullDocument);