Skip to content

Commit

Permalink
fix(NODE-4069): remove 'default' from options for fullDocument field …
Browse files Browse the repository at this point in the history
…in change stream options (#3169)
  • Loading branch information
baileympearson committed Mar 16, 2022
1 parent d43bd10 commit 799689e
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 27 deletions.
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -122,6 +122,7 @@
"check:socks5": "mocha --config test/manual/mocharc.json test/manual/socks5.test.ts",
"check:csfle": "mocha --config test/mocha_mongodb.json test/integration/client-side-encryption",
"check:snappy": "mocha test/unit/assorted/snappy.test.js",
"fix:eslint": "npm run check:eslint -- --fix",
"prepare": "node etc/prepare.js",
"preview:docs": "ts-node etc/docs/preview.ts",
"release": "standard-version -i HISTORY.md",
Expand Down
63 changes: 36 additions & 27 deletions src/change_stream.ts
Expand Up @@ -45,10 +45,20 @@ const kClosed = Symbol('closed');
/** @internal */
const kMode = Symbol('mode');

const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument'];
const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat(
CHANGE_STREAM_OPTIONS
);
const CHANGE_STREAM_OPTIONS = [
'resumeAfter',
'startAfter',
'startAtOperationTime',
'fullDocument'
] as const;

const CURSOR_OPTIONS = [
'batchSize',
'maxAwaitTimeMS',
'collation',
'readPreference',
...CHANGE_STREAM_OPTIONS
] as const;

const CHANGE_DOMAIN_TYPES = {
COLLECTION: Symbol('Collection'),
Expand All @@ -68,6 +78,8 @@ export interface ResumeOptions {
maxAwaitTimeMS?: number;
collation?: CollationOptions;
readPreference?: ReadPreference;
resumeAfter?: ResumeToken;
startAfter?: ResumeToken;
}

/**
Expand All @@ -94,7 +106,7 @@ export interface PipeOptions {
* @public
*/
export interface ChangeStreamOptions extends AggregateOptions {
/** Allowed values: ‘default’, ‘updateLookup. When set to updateLookup, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred. */
/** Allowed values: 'updateLookup'. When set to 'updateLookup', the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred. */
fullDocument?: string;
/** The maximum amount of time for the server to wait on new documents to satisfy a change stream query. */
maxAwaitTimeMS?: number;
Expand Down Expand Up @@ -446,22 +458,18 @@ export class ChangeStreamCursor<TSchema extends Document = Document> extends Abs
}

get resumeOptions(): ResumeOptions {
const result = {} as ResumeOptions;
for (const optionName of CURSOR_OPTIONS) {
if (Reflect.has(this.options, optionName)) {
Reflect.set(result, optionName, Reflect.get(this.options, optionName));
}
}
const result: ResumeOptions = applyKnownOptions(this.options, CURSOR_OPTIONS);

if (this.resumeToken || this.startAtOperationTime) {
['resumeAfter', 'startAfter', 'startAtOperationTime'].forEach(key =>
Reflect.deleteProperty(result, key)
);
for (const key of ['resumeAfter', 'startAfter', 'startAtOperationTime']) {
Reflect.deleteProperty(result, key);
}

if (this.resumeToken) {
const resumeKey =
this.options.startAfter && !this.hasReceived ? 'startAfter' : 'resumeAfter';
Reflect.set(result, resumeKey, this.resumeToken);

result[resumeKey] = this.resumeToken;
} else if (this.startAtOperationTime && maxWireVersion(this.server) >= 7) {
result.startAtOperationTime = this.startAtOperationTime;
}
Expand Down Expand Up @@ -568,25 +576,25 @@ function setIsIterator<TSchema>(changeStream: ChangeStream<TSchema>): void {
}
changeStream[kMode] = 'iterator';
}

/**
* Create a new change stream cursor based on self's configuration
* @internal
*/
function createChangeStreamCursor<TSchema>(
changeStream: ChangeStream<TSchema>,
options: ChangeStreamOptions
options: ChangeStreamOptions | ResumeOptions
): ChangeStreamCursor<TSchema> {
const changeStreamStageOptions: Document = { fullDocument: options.fullDocument || 'default' };
applyKnownOptions(changeStreamStageOptions, options, CHANGE_STREAM_OPTIONS);
const changeStreamStageOptions = applyKnownOptions(options, CHANGE_STREAM_OPTIONS);
if (changeStream.type === CHANGE_DOMAIN_TYPES.CLUSTER) {
changeStreamStageOptions.allChangesForCluster = true;
}

const pipeline = [{ $changeStream: changeStreamStageOptions } as Document].concat(
changeStream.pipeline
);

const cursorOptions = applyKnownOptions({}, options, CURSOR_OPTIONS);
const cursorOptions: ChangeStreamCursorOptions = applyKnownOptions(options, CURSOR_OPTIONS);

const changeStreamCursor = new ChangeStreamCursor<TSchema>(
getTopology(changeStream.parent),
changeStream.namespace,
Expand All @@ -605,16 +613,17 @@ function createChangeStreamCursor<TSchema>(
return changeStreamCursor;
}

function applyKnownOptions(target: Document, source: Document, optionNames: string[]) {
optionNames.forEach(name => {
if (source[name]) {
target[name] = source[name];
function applyKnownOptions(source: Document, options: ReadonlyArray<string>) {
const result: Document = {};

for (const option of options) {
if (source[option]) {
result[option] = source[option];
}
});
}

return target;
return result;
}

interface TopologyWaitOptions {
start?: number;
timeout?: number;
Expand Down
80 changes: 80 additions & 0 deletions test/integration/change-streams/change_stream.test.js
Expand Up @@ -187,6 +187,86 @@ describe('Change Streams', function () {
});
afterEach(async () => await mock.cleanup());

context('ChangeStreamCursor options', function () {
let client, db, collection;

beforeEach(async function () {
client = await this.configuration.newClient().connect();
db = client.db('db');
collection = db.collection('collection');
});

afterEach(async function () {
await client.close();
client = undefined;
db = undefined;
collection = undefined;
});

context('fullDocument', () => {
it('does not set fullDocument if no value is provided', function () {
const changeStream = client.watch();

expect(changeStream).not.to.have.nested.property(
'cursor.pipeline[0].$changeStream.fullDocument'
);
});

it('does not validate the value passed in for the `fullDocument` property', function () {
const changeStream = client.watch([], { fullDocument: 'invalid value' });

expect(changeStream).to.have.nested.property(
'cursor.pipeline[0].$changeStream.fullDocument',
'invalid value'
);
});

it('assigns `fullDocument` to the correct value if it is passed as an option', function () {
const changeStream = client.watch([], { fullDocument: 'updateLookup' });

expect(changeStream).to.have.nested.property(
'cursor.pipeline[0].$changeStream.fullDocument',
'updateLookup'
);
});
});

context('allChangesForCluster', () => {
it('assigns `allChangesForCluster` to `true` if the ChangeStream.type is Cluster', function () {
const changeStream = client.watch();

expect(changeStream).to.have.nested.property(
'cursor.pipeline[0].$changeStream.allChangesForCluster',
true
);
});

it('does not assign `allChangesForCluster` if the ChangeStream.type is Db', function () {
const changeStream = db.watch();

expect(changeStream).not.to.have.nested.property(
'cursor.pipeline[0].$changeStream.allChangesForCluster'
);
});

it('does not assign `allChangesForCluster` if the ChangeStream.type is Collection', function () {
const changeStream = collection.watch();

expect(changeStream).not.to.have.nested.property(
'cursor.pipeline[0].$changeStream.allChangesForCluster'
);
});
});

it('ignores any invalid option values', function () {
const changeStream = collection.watch([], { invalidOption: true });

expect(changeStream).not.to.have.nested.property(
'cursor.pipeline[0].$changeStream.invalidOption'
);
});
});

it('should close the listeners after the cursor is closed', {
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } },

Expand Down
9 changes: 9 additions & 0 deletions test/types/change_stream.test-d.ts
@@ -0,0 +1,9 @@
import { expectType } from 'tsd';

import type { ChangeStreamOptions } from '../../src';

declare const changeStreamOptions: ChangeStreamOptions;

// The change stream spec says that we cannot throw an error for invalid values to `fullDocument`
// for future compatability. This means we must leave `fullDocument` as type string.
expectType<string | undefined>(changeStreamOptions.fullDocument);

0 comments on commit 799689e

Please sign in to comment.