From 11a310f27203f11a7e2012a1c8b24dd7c82e8781 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Thu, 10 Mar 2022 14:32:04 -0500 Subject: [PATCH 1/9] fix: remove 'default' from options for fullDocument field in change stream options --- src/change_stream.ts | 33 ++++++---- src/index.ts | 2 +- .../change-streams/change_stream.test.js | 65 ++++++++++++++++++- 3 files changed, 87 insertions(+), 13 deletions(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index 57ee4f7b5e..7a79ab58a0 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -46,9 +46,14 @@ const kClosed = Symbol('closed'); const kMode = Symbol('mode'); const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument']; -const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat( - CHANGE_STREAM_OPTIONS -); + +const CURSOR_OPTIONS = [ + 'batchSize', + 'maxAwaitTimeMS', + 'collation', + 'readPreference', + ...CHANGE_STREAM_OPTIONS +]; const CHANGE_DOMAIN_TYPES = { COLLECTION: Symbol('Collection'), @@ -94,8 +99,8 @@ export interface PipeOptions { * @public */ export interface ChangeStreamOptions extends AggregateOptions { - /** Allowed values: ‘default’, ‘updateLookup’. When set to ‘updateLookup’, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred. */ - fullDocument?: string; + /** Allowed values: ‘updateLookup’. When set to ‘updateLookup’, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred. */ + fullDocument?: 'updateLookup'; /** The maximum amount of time for the server to wait on new documents to satisfy a change stream query. */ maxAwaitTimeMS?: number; /** Allows you to start a changeStream after a specified event. See {@link https://docs.mongodb.com/manual/changeStreams/#resumeafter-for-change-streams|ChangeStream documentation}. */ @@ -576,8 +581,14 @@ function createChangeStreamCursor( changeStream: ChangeStream, options: ChangeStreamOptions ): ChangeStreamCursor { - const changeStreamStageOptions: Document = { fullDocument: options.fullDocument || 'default' }; - applyKnownOptions(changeStreamStageOptions, options, CHANGE_STREAM_OPTIONS); + const changeStreamStageOptions: Document = applyKnownOptions( + { + fullDocument: options.fullDocument + }, + options, + CHANGE_STREAM_OPTIONS + ); + if (changeStream.type === CHANGE_DOMAIN_TYPES.CLUSTER) { changeStreamStageOptions.allChangesForCluster = true; } @@ -606,11 +617,11 @@ function createChangeStreamCursor( } function applyKnownOptions(target: Document, source: Document, optionNames: string[]) { - optionNames.forEach(name => { - if (source[name]) { - target[name] = source[name]; + for (const option of optionNames) { + if (source[option]) { + target[option] = source[option]; } - }); + } return target; } diff --git a/src/index.ts b/src/index.ts index 0502109519..f55e0a773f 100644 --- a/src/index.ts +++ b/src/index.ts @@ -166,7 +166,6 @@ export type { export type { OrderedBulkOperation } from './bulk/ordered'; export type { UnorderedBulkOperation } from './bulk/unordered'; export type { - ChangeStream, ChangeStreamCursor, ChangeStreamCursorOptions, ChangeStreamDocument, @@ -178,6 +177,7 @@ export type { ResumeToken, UpdateDescription } from './change_stream'; +export { ChangeStream } from './change_stream'; export type { AuthMechanismProperties, MongoCredentials, diff --git a/test/integration/change-streams/change_stream.test.js b/test/integration/change-streams/change_stream.test.js index 892bc9e215..b077874243 100644 --- a/test/integration/change-streams/change_stream.test.js +++ b/test/integration/change-streams/change_stream.test.js @@ -7,7 +7,7 @@ const { EventCollector, getSymbolFrom } = require('../../tools/utils'); const { expect } = require('chai'); const sinon = require('sinon'); -const { Long, ReadPreference, MongoNetworkError } = require('../../../src'); +const { Long, ReadPreference, MongoNetworkError, ChangeStream } = require('../../../src'); const crypto = require('crypto'); const { isHello } = require('../../../src/utils'); @@ -187,6 +187,69 @@ describe('Change Streams', function () { }); afterEach(async () => await mock.cleanup()); + context('ChangeStreamCursor options', function () { + let client, db, collection; + + beforeEach(async function () { + client = this.configuration.newClient(); + await client.connect(); + db = client.db('db'); + collection = db.collection('collection'); + }); + + afterEach(async function () { + await client.close(); + client = undefined; + db = undefined; + collection = undefined; + }); + + context('fullDocument', () => { + it('sets fullDocument to `undefined` if no value is passed', function () { + const changeStream = new ChangeStream(client); + + expect(changeStream.cursor).to.haveOwnProperty('pipeline'); + const pipelineOptions = changeStream.cursor.pipeline[0].$changeStream; + expect(pipelineOptions).to.haveOwnProperty('fullDocument').to.be.undefined; + }); + + it('assigns `fullDocument` to the correct value if it is passed as an option', function () { + const changeStream = new ChangeStream(client, [], { fullDocument: 'updateLookup' }); + + expect(changeStream.cursor).to.haveOwnProperty('pipeline'); + const pipelineOptions = changeStream.cursor.pipeline[0].$changeStream; + expect(pipelineOptions).to.haveOwnProperty('fullDocument').to.equal('updateLookup'); + }); + }); + + context('allChangesForCluster', () => { + it('assigns `allChangesForCluster` to `true` if the ChangeStream.type is Cluster', function () { + const changeStream = new ChangeStream(client); + + expect(changeStream.cursor).to.haveOwnProperty('pipeline'); + const pipelineOptions = changeStream.cursor.pipeline[0].$changeStream; + expect(pipelineOptions).to.haveOwnProperty('allChangesForCluster').to.be.true; + }); + + it('does not assigns `allChangesForCluster` if the ChangeStream.type is Db', function () { + const changeStream = new ChangeStream(db); + + expect(changeStream.cursor).to.haveOwnProperty('pipeline'); + + const pipelineOptions = changeStream.cursor.pipeline[0].$changeStream; + expect(pipelineOptions).not.to.haveOwnProperty('allChangesForCluster'); + }); + + it('does not assigns `allChangesForCluster` if the ChangeStream.type is Db', function () { + const changeStream = new ChangeStream(collection); + + expect(changeStream.cursor).to.haveOwnProperty('pipeline'); + const pipelineOptions = changeStream.cursor.pipeline[0].$changeStream; + expect(pipelineOptions).not.to.haveOwnProperty('allChangesForCluster'); + }); + }); + }); + it('should close the listeners after the cursor is closed', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, From 516411e1164f0405133a27816b72503294294a44 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Thu, 10 Mar 2022 14:44:25 -0500 Subject: [PATCH 2/9] fix: add test case for invalid property --- package.json | 1 + test/integration/change-streams/change_stream.test.js | 7 +++++++ 2 files changed, 8 insertions(+) diff --git a/package.json b/package.json index feab85f634..9304f54492 100644 --- a/package.json +++ b/package.json @@ -122,6 +122,7 @@ "check:socks5": "mocha --config test/manual/mocharc.json test/manual/socks5.test.ts", "check:csfle": "mocha --config test/mocha_mongodb.json test/integration/client-side-encryption", "check:snappy": "mocha test/unit/assorted/snappy.test.js", + "eslint:fix": "npm run check:eslint -- --fix", "prepare": "node etc/prepare.js", "preview:docs": "ts-node etc/docs/preview.ts", "release": "standard-version -i HISTORY.md", diff --git a/test/integration/change-streams/change_stream.test.js b/test/integration/change-streams/change_stream.test.js index b077874243..92d5a2f9af 100644 --- a/test/integration/change-streams/change_stream.test.js +++ b/test/integration/change-streams/change_stream.test.js @@ -248,6 +248,13 @@ describe('Change Streams', function () { expect(pipelineOptions).not.to.haveOwnProperty('allChangesForCluster'); }); }); + + it('ignores any invalid option values', function () { + const changeStream = new ChangeStream(collection, [], { invalidOption: true }); + expect(changeStream.cursor).to.haveOwnProperty('pipeline'); + const pipelineOptions = changeStream.cursor.pipeline[0].$changeStream; + expect(pipelineOptions).not.to.haveOwnProperty('invalidOption'); + }); }); it('should close the listeners after the cursor is closed', { From 842857d2af5cff1ccccd960150b313f271cf7d2c Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Fri, 11 Mar 2022 15:09:36 -0500 Subject: [PATCH 3/9] Address Neal's comments --- package.json | 2 +- src/change_stream.ts | 66 +++++++++++-------- src/index.ts | 2 +- .../change-streams/change_stream.test.js | 59 +++++++++-------- test/types/change_streams.test-d.ts | 11 ++++ 5 files changed, 84 insertions(+), 56 deletions(-) create mode 100644 test/types/change_streams.test-d.ts diff --git a/package.json b/package.json index 9304f54492..f167de6113 100644 --- a/package.json +++ b/package.json @@ -122,7 +122,7 @@ "check:socks5": "mocha --config test/manual/mocharc.json test/manual/socks5.test.ts", "check:csfle": "mocha --config test/mocha_mongodb.json test/integration/client-side-encryption", "check:snappy": "mocha test/unit/assorted/snappy.test.js", - "eslint:fix": "npm run check:eslint -- --fix", + "fix:eslint": "npm run check:eslint -- --fix", "prepare": "node etc/prepare.js", "preview:docs": "ts-node etc/docs/preview.ts", "release": "standard-version -i HISTORY.md", diff --git a/src/change_stream.ts b/src/change_stream.ts index 7a79ab58a0..085a7d6157 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -45,7 +45,12 @@ const kClosed = Symbol('closed'); /** @internal */ const kMode = Symbol('mode'); -const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument']; +const CHANGE_STREAM_OPTIONS = [ + 'resumeAfter', + 'startAfter', + 'startAtOperationTime', + 'fullDocument' +] as const; const CURSOR_OPTIONS = [ 'batchSize', @@ -53,7 +58,7 @@ const CURSOR_OPTIONS = [ 'collation', 'readPreference', ...CHANGE_STREAM_OPTIONS -]; +] as const; const CHANGE_DOMAIN_TYPES = { COLLECTION: Symbol('Collection'), @@ -100,7 +105,7 @@ export interface PipeOptions { */ export interface ChangeStreamOptions extends AggregateOptions { /** Allowed values: ‘updateLookup’. When set to ‘updateLookup’, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred. */ - fullDocument?: 'updateLookup'; + fullDocument?: string; /** The maximum amount of time for the server to wait on new documents to satisfy a change stream query. */ maxAwaitTimeMS?: number; /** Allows you to start a changeStream after a specified event. See {@link https://docs.mongodb.com/manual/changeStreams/#resumeafter-for-change-streams|ChangeStream documentation}. */ @@ -573,31 +578,50 @@ function setIsIterator(changeStream: ChangeStream): void { } changeStream[kMode] = 'iterator'; } + +function createChangeStreamStageOptions( + changeStream: ChangeStream, + changeStreamOptions: ChangeStreamOptions +) { + const changeStreamStageOptions: Document = { + fullDocument: changeStreamOptions.fullDocument + }; + + for (const optionName of CHANGE_STREAM_OPTIONS) { + if (changeStreamOptions[optionName]) { + changeStreamStageOptions[optionName] = changeStreamOptions[optionName]; + } + } + + if (changeStream.type === CHANGE_DOMAIN_TYPES.CLUSTER) { + changeStreamStageOptions.allChangesForCluster = true; + } + + return changeStreamStageOptions; +} /** * Create a new change stream cursor based on self's configuration * @internal */ function createChangeStreamCursor( changeStream: ChangeStream, - options: ChangeStreamOptions + changeStreamOptions: ChangeStreamOptions ): ChangeStreamCursor { - const changeStreamStageOptions: Document = applyKnownOptions( - { - fullDocument: options.fullDocument - }, - options, - CHANGE_STREAM_OPTIONS + const changeStreamStageOptions = createChangeStreamStageOptions( + changeStream, + changeStreamOptions ); - - if (changeStream.type === CHANGE_DOMAIN_TYPES.CLUSTER) { - changeStreamStageOptions.allChangesForCluster = true; - } - const pipeline = [{ $changeStream: changeStreamStageOptions } as Document].concat( changeStream.pipeline ); - const cursorOptions = applyKnownOptions({}, options, CURSOR_OPTIONS); + const cursorOptions: Document = {}; + for (const optionName of CURSOR_OPTIONS) { + if (changeStreamOptions[optionName]) { + cursorOptions[optionName] = changeStreamOptions[optionName]; + } + } + const changeStreamCursor = new ChangeStreamCursor( getTopology(changeStream.parent), changeStream.namespace, @@ -616,16 +640,6 @@ function createChangeStreamCursor( return changeStreamCursor; } -function applyKnownOptions(target: Document, source: Document, optionNames: string[]) { - for (const option of optionNames) { - if (source[option]) { - target[option] = source[option]; - } - } - - return target; -} - interface TopologyWaitOptions { start?: number; timeout?: number; diff --git a/src/index.ts b/src/index.ts index f55e0a773f..0502109519 100644 --- a/src/index.ts +++ b/src/index.ts @@ -166,6 +166,7 @@ export type { export type { OrderedBulkOperation } from './bulk/ordered'; export type { UnorderedBulkOperation } from './bulk/unordered'; export type { + ChangeStream, ChangeStreamCursor, ChangeStreamCursorOptions, ChangeStreamDocument, @@ -177,7 +178,6 @@ export type { ResumeToken, UpdateDescription } from './change_stream'; -export { ChangeStream } from './change_stream'; export type { AuthMechanismProperties, MongoCredentials, diff --git a/test/integration/change-streams/change_stream.test.js b/test/integration/change-streams/change_stream.test.js index 92d5a2f9af..a42cd2b8c1 100644 --- a/test/integration/change-streams/change_stream.test.js +++ b/test/integration/change-streams/change_stream.test.js @@ -7,7 +7,7 @@ const { EventCollector, getSymbolFrom } = require('../../tools/utils'); const { expect } = require('chai'); const sinon = require('sinon'); -const { Long, ReadPreference, MongoNetworkError, ChangeStream } = require('../../../src'); +const { Long, ReadPreference, MongoNetworkError } = require('../../../src'); const crypto = require('crypto'); const { isHello } = require('../../../src/utils'); @@ -166,7 +166,7 @@ const pipeline = [ { $addFields: { comment: 'The documentKey field has been projected out of this document.' } } ]; -describe('Change Streams', function () { +describe.only('Change Streams', function () { before(async function () { return await setupDatabase(this.configuration, ['integration_tests']); }); @@ -206,54 +206,57 @@ describe('Change Streams', function () { context('fullDocument', () => { it('sets fullDocument to `undefined` if no value is passed', function () { - const changeStream = new ChangeStream(client); + const changeStream = client.watch(); - expect(changeStream.cursor).to.haveOwnProperty('pipeline'); - const pipelineOptions = changeStream.cursor.pipeline[0].$changeStream; - expect(pipelineOptions).to.haveOwnProperty('fullDocument').to.be.undefined; + expect(changeStream).to.have.nested.property( + 'cursor.pipeline[0].$changeStream.fullDocument', + undefined + ); }); it('assigns `fullDocument` to the correct value if it is passed as an option', function () { - const changeStream = new ChangeStream(client, [], { fullDocument: 'updateLookup' }); + const changeStream = client.watch([], { fullDocument: 'updateLookup' }); - expect(changeStream.cursor).to.haveOwnProperty('pipeline'); - const pipelineOptions = changeStream.cursor.pipeline[0].$changeStream; - expect(pipelineOptions).to.haveOwnProperty('fullDocument').to.equal('updateLookup'); + expect(changeStream).to.have.nested.property( + 'cursor.pipeline[0].$changeStream.fullDocument', + 'updateLookup' + ); }); }); context('allChangesForCluster', () => { it('assigns `allChangesForCluster` to `true` if the ChangeStream.type is Cluster', function () { - const changeStream = new ChangeStream(client); + const changeStream = client.watch(); - expect(changeStream.cursor).to.haveOwnProperty('pipeline'); - const pipelineOptions = changeStream.cursor.pipeline[0].$changeStream; - expect(pipelineOptions).to.haveOwnProperty('allChangesForCluster').to.be.true; + expect(changeStream).to.have.nested.property( + 'cursor.pipeline[0].$changeStream.allChangesForCluster', + true + ); }); it('does not assigns `allChangesForCluster` if the ChangeStream.type is Db', function () { - const changeStream = new ChangeStream(db); + const changeStream = db.watch(); - expect(changeStream.cursor).to.haveOwnProperty('pipeline'); - - const pipelineOptions = changeStream.cursor.pipeline[0].$changeStream; - expect(pipelineOptions).not.to.haveOwnProperty('allChangesForCluster'); + expect(changeStream).not.to.have.nested.property( + 'cursor.pipeline[0].$changeStream.allChangesForCluster' + ); }); - it('does not assigns `allChangesForCluster` if the ChangeStream.type is Db', function () { - const changeStream = new ChangeStream(collection); + it('does not assign `allChangesForCluster` if the ChangeStream.type is Db', function () { + const changeStream = collection.watch(); - expect(changeStream.cursor).to.haveOwnProperty('pipeline'); - const pipelineOptions = changeStream.cursor.pipeline[0].$changeStream; - expect(pipelineOptions).not.to.haveOwnProperty('allChangesForCluster'); + expect(changeStream).not.to.have.nested.property( + 'cursor.pipeline[0].$changeStream.allChangesForCluster' + ); }); }); it('ignores any invalid option values', function () { - const changeStream = new ChangeStream(collection, [], { invalidOption: true }); - expect(changeStream.cursor).to.haveOwnProperty('pipeline'); - const pipelineOptions = changeStream.cursor.pipeline[0].$changeStream; - expect(pipelineOptions).not.to.haveOwnProperty('invalidOption'); + const changeStream = collection.watch([], { invalidOption: true }); + + expect(changeStream).not.to.have.nested.property( + 'cursor.pipeline[0].$changeStream.invalidOption' + ); }); }); diff --git a/test/types/change_streams.test-d.ts b/test/types/change_streams.test-d.ts new file mode 100644 index 0000000000..98de76c29b --- /dev/null +++ b/test/types/change_streams.test-d.ts @@ -0,0 +1,11 @@ +import { expectError } from 'tsd'; + +import type { ChangeStreamOptions } from '../../src'; + +declare const changeStreamOptions: ChangeStreamOptions; + +// TODO(NODE-4076) +// The types of `ChangeStreamOptions.fullDocument` should be strenghened to +// only allow the value `updateLookup` but this cannot be done until node v5. +// At that time, this test can be removed (or reworked if we think that's valuable). +expectError<'updateLookup' | undefined>(changeStreamOptions.fullDocument); From 7c2df8d20b2c5d86bc09fd44091567f7b5b19613 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Fri, 11 Mar 2022 15:21:28 -0500 Subject: [PATCH 4/9] fix: address neal's comments --- test/integration/change-streams/change_stream.test.js | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.js b/test/integration/change-streams/change_stream.test.js index a42cd2b8c1..4359d16e0a 100644 --- a/test/integration/change-streams/change_stream.test.js +++ b/test/integration/change-streams/change_stream.test.js @@ -166,7 +166,7 @@ const pipeline = [ { $addFields: { comment: 'The documentKey field has been projected out of this document.' } } ]; -describe.only('Change Streams', function () { +describe('Change Streams', function () { before(async function () { return await setupDatabase(this.configuration, ['integration_tests']); }); @@ -191,8 +191,7 @@ describe.only('Change Streams', function () { let client, db, collection; beforeEach(async function () { - client = this.configuration.newClient(); - await client.connect(); + client = await this.configuration.newClient().connect(); db = client.db('db'); collection = db.collection('collection'); }); From 9c1f665ec564543da270727196d77c808bc1f81f Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Mon, 14 Mar 2022 14:48:36 -0400 Subject: [PATCH 5/9] fix: re-add applyKnownOptions --- src/change_stream.ts | 59 +++++++++++++++++--------------------------- 1 file changed, 22 insertions(+), 37 deletions(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index 085a7d6157..466dc8f31f 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -78,6 +78,8 @@ export interface ResumeOptions { maxAwaitTimeMS?: number; collation?: CollationOptions; readPreference?: ReadPreference; + resumeAfter?: ResumeToken; + startAfter?: ResumeToken; } /** @@ -104,7 +106,7 @@ export interface PipeOptions { * @public */ export interface ChangeStreamOptions extends AggregateOptions { - /** Allowed values: ‘updateLookup’. When set to ‘updateLookup’, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred. */ + /** Allowed values: 'updateLookup'. When set to 'updateLookup', the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred. */ fullDocument?: string; /** The maximum amount of time for the server to wait on new documents to satisfy a change stream query. */ maxAwaitTimeMS?: number; @@ -456,22 +458,18 @@ export class ChangeStreamCursor extends Abs } get resumeOptions(): ResumeOptions { - const result = {} as ResumeOptions; - for (const optionName of CURSOR_OPTIONS) { - if (Reflect.has(this.options, optionName)) { - Reflect.set(result, optionName, Reflect.get(this.options, optionName)); - } - } + const result: ResumeOptions = applyKnownOptions(this.options, CURSOR_OPTIONS); if (this.resumeToken || this.startAtOperationTime) { - ['resumeAfter', 'startAfter', 'startAtOperationTime'].forEach(key => - Reflect.deleteProperty(result, key) - ); + for (const key of ['resumeAfter', 'startAfter', 'startAtOperationTime']) { + Reflect.deleteProperty(result, key); + } if (this.resumeToken) { const resumeKey = this.options.startAfter && !this.hasReceived ? 'startAfter' : 'resumeAfter'; - Reflect.set(result, resumeKey, this.resumeToken); + + result[resumeKey] = this.resumeToken; } else if (this.startAtOperationTime && maxWireVersion(this.server) >= 7) { result.startAtOperationTime = this.startAtOperationTime; } @@ -579,48 +577,35 @@ function setIsIterator(changeStream: ChangeStream): void { changeStream[kMode] = 'iterator'; } -function createChangeStreamStageOptions( - changeStream: ChangeStream, - changeStreamOptions: ChangeStreamOptions -) { - const changeStreamStageOptions: Document = { - fullDocument: changeStreamOptions.fullDocument - }; +function applyKnownOptions(source: Document, options: ReadonlyArray) { + const result: Document = {}; - for (const optionName of CHANGE_STREAM_OPTIONS) { - if (changeStreamOptions[optionName]) { - changeStreamStageOptions[optionName] = changeStreamOptions[optionName]; + for (const option of options) { + if (source[option]) { + result[option] = source[option]; } } - if (changeStream.type === CHANGE_DOMAIN_TYPES.CLUSTER) { - changeStreamStageOptions.allChangesForCluster = true; - } - - return changeStreamStageOptions; + return result; } + /** * Create a new change stream cursor based on self's configuration * @internal */ function createChangeStreamCursor( changeStream: ChangeStream, - changeStreamOptions: ChangeStreamOptions + changeStreamOptions: ChangeStreamOptions | ResumeOptions ): ChangeStreamCursor { - const changeStreamStageOptions = createChangeStreamStageOptions( - changeStream, - changeStreamOptions - ); + const changeStreamStageOptions = applyKnownOptions(changeStreamOptions, CHANGE_STREAM_OPTIONS); const pipeline = [{ $changeStream: changeStreamStageOptions } as Document].concat( changeStream.pipeline ); - const cursorOptions: Document = {}; - for (const optionName of CURSOR_OPTIONS) { - if (changeStreamOptions[optionName]) { - cursorOptions[optionName] = changeStreamOptions[optionName]; - } - } + const cursorOptions: ChangeStreamCursorOptions = applyKnownOptions( + changeStreamOptions, + CURSOR_OPTIONS + ); const changeStreamCursor = new ChangeStreamCursor( getTopology(changeStream.parent), From f87b53f82fef95efe9434c089f062831a2bfe742 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Mon, 14 Mar 2022 15:41:02 -0400 Subject: [PATCH 6/9] fix: address Neal's comments pt 3 --- src/change_stream.ts | 35 +++++++++---------- .../change-streams/change_stream.test.js | 5 ++- 2 files changed, 19 insertions(+), 21 deletions(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index 466dc8f31f..742065a480 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -577,35 +577,23 @@ function setIsIterator(changeStream: ChangeStream): void { changeStream[kMode] = 'iterator'; } -function applyKnownOptions(source: Document, options: ReadonlyArray) { - const result: Document = {}; - - for (const option of options) { - if (source[option]) { - result[option] = source[option]; - } - } - - return result; -} - /** * Create a new change stream cursor based on self's configuration * @internal */ function createChangeStreamCursor( changeStream: ChangeStream, - changeStreamOptions: ChangeStreamOptions | ResumeOptions + options: ChangeStreamOptions | ResumeOptions ): ChangeStreamCursor { - const changeStreamStageOptions = applyKnownOptions(changeStreamOptions, CHANGE_STREAM_OPTIONS); + const changeStreamStageOptions = applyKnownOptions(options, CHANGE_STREAM_OPTIONS); + if (changeStream.type === CHANGE_DOMAIN_TYPES.CLUSTER) { + changeStreamStageOptions.allChangesForCluster = true; + } const pipeline = [{ $changeStream: changeStreamStageOptions } as Document].concat( changeStream.pipeline ); - const cursorOptions: ChangeStreamCursorOptions = applyKnownOptions( - changeStreamOptions, - CURSOR_OPTIONS - ); + const cursorOptions: ChangeStreamCursorOptions = applyKnownOptions(options, CURSOR_OPTIONS); const changeStreamCursor = new ChangeStreamCursor( getTopology(changeStream.parent), @@ -625,6 +613,17 @@ function createChangeStreamCursor( return changeStreamCursor; } +function applyKnownOptions(source: Document, options: ReadonlyArray) { + const result: Document = {}; + + for (const option of options) { + if (source[option]) { + result[option] = source[option]; + } + } + + return result; +} interface TopologyWaitOptions { start?: number; timeout?: number; diff --git a/test/integration/change-streams/change_stream.test.js b/test/integration/change-streams/change_stream.test.js index 4359d16e0a..c0b5e967d3 100644 --- a/test/integration/change-streams/change_stream.test.js +++ b/test/integration/change-streams/change_stream.test.js @@ -207,9 +207,8 @@ describe('Change Streams', function () { it('sets fullDocument to `undefined` if no value is passed', function () { const changeStream = client.watch(); - expect(changeStream).to.have.nested.property( - 'cursor.pipeline[0].$changeStream.fullDocument', - undefined + expect(changeStream).not.to.have.nested.property( + 'cursor.pipeline[0].$changeStream.fullDocument' ); }); From 5429180d87bcfcbda68ad2effab7cb3e9fb31623 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Tue, 15 Mar 2022 09:52:59 -0400 Subject: [PATCH 7/9] fix: Address Daria's comments on PR --- .../change-streams/change_stream.test.js | 15 ++++++++++++--- test/types/change_streams.test-d.ts | 11 ----------- 2 files changed, 12 insertions(+), 14 deletions(-) delete mode 100644 test/types/change_streams.test-d.ts diff --git a/test/integration/change-streams/change_stream.test.js b/test/integration/change-streams/change_stream.test.js index c0b5e967d3..33e474647b 100644 --- a/test/integration/change-streams/change_stream.test.js +++ b/test/integration/change-streams/change_stream.test.js @@ -204,7 +204,7 @@ describe('Change Streams', function () { }); context('fullDocument', () => { - it('sets fullDocument to `undefined` if no value is passed', function () { + it('does not set fullDocument if no value is provided', function () { const changeStream = client.watch(); expect(changeStream).not.to.have.nested.property( @@ -212,6 +212,15 @@ describe('Change Streams', function () { ); }); + it('does not validate the value passed in for the `fullDocument` property', function () { + const changeStream = client.watch([], { fullDocument: 'invalid value' }); + + expect(changeStream).to.have.nested.property( + 'cursor.pipeline[0].$changeStream.fullDocument', + 'invalid value' + ); + }); + it('assigns `fullDocument` to the correct value if it is passed as an option', function () { const changeStream = client.watch([], { fullDocument: 'updateLookup' }); @@ -232,7 +241,7 @@ describe('Change Streams', function () { ); }); - it('does not assigns `allChangesForCluster` if the ChangeStream.type is Db', function () { + it('does not assign `allChangesForCluster` if the ChangeStream.type is Db', function () { const changeStream = db.watch(); expect(changeStream).not.to.have.nested.property( @@ -240,7 +249,7 @@ describe('Change Streams', function () { ); }); - it('does not assign `allChangesForCluster` if the ChangeStream.type is Db', function () { + it('does not assign `allChangesForCluster` if the ChangeStream.type is Collection', function () { const changeStream = collection.watch(); expect(changeStream).not.to.have.nested.property( diff --git a/test/types/change_streams.test-d.ts b/test/types/change_streams.test-d.ts deleted file mode 100644 index 98de76c29b..0000000000 --- a/test/types/change_streams.test-d.ts +++ /dev/null @@ -1,11 +0,0 @@ -import { expectError } from 'tsd'; - -import type { ChangeStreamOptions } from '../../src'; - -declare const changeStreamOptions: ChangeStreamOptions; - -// TODO(NODE-4076) -// The types of `ChangeStreamOptions.fullDocument` should be strenghened to -// only allow the value `updateLookup` but this cannot be done until node v5. -// At that time, this test can be removed (or reworked if we think that's valuable). -expectError<'updateLookup' | undefined>(changeStreamOptions.fullDocument); From a4ed729c43b804ce4a87b3e712e8ad3f17968831 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Tue, 15 Mar 2022 10:02:11 -0400 Subject: [PATCH 8/9] Add back TS test --- test/types/change_stream.test-d.ts | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 test/types/change_stream.test-d.ts diff --git a/test/types/change_stream.test-d.ts b/test/types/change_stream.test-d.ts new file mode 100644 index 0000000000..a50852859a --- /dev/null +++ b/test/types/change_stream.test-d.ts @@ -0,0 +1,9 @@ +import { expectError } from 'tsd'; + +import type { ChangeStreamOptions } from '../../src'; + +declare const changeStreamOptions: ChangeStreamOptions; + +// The change stream spec says that we cannot throw an error for invalid values to `fullDocument` +// for future compatability. This means we must leave `fullDocument` as type string. +expectError<'updateLookup' | undefined>(changeStreamOptions.fullDocument); From f50c91af9a181ae6519835132a1e1ec22b666ded Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Wed, 16 Mar 2022 15:25:29 -0400 Subject: [PATCH 9/9] fix: use 'expectType' instead of 'expectError' in type test --- test/types/change_stream.test-d.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/types/change_stream.test-d.ts b/test/types/change_stream.test-d.ts index a50852859a..07efa9a378 100644 --- a/test/types/change_stream.test-d.ts +++ b/test/types/change_stream.test-d.ts @@ -1,4 +1,4 @@ -import { expectError } from 'tsd'; +import { expectType } from 'tsd'; import type { ChangeStreamOptions } from '../../src'; @@ -6,4 +6,4 @@ declare const changeStreamOptions: ChangeStreamOptions; // The change stream spec says that we cannot throw an error for invalid values to `fullDocument` // for future compatability. This means we must leave `fullDocument` as type string. -expectError<'updateLookup' | undefined>(changeStreamOptions.fullDocument); +expectType(changeStreamOptions.fullDocument);