Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-4069): remove 'default' from options for fullDocument field in change stream options #3169

Merged
merged 9 commits into from Mar 16, 2022
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -122,6 +122,7 @@
"check:socks5": "mocha --config test/manual/mocharc.json test/manual/socks5.test.ts",
"check:csfle": "mocha --config test/mocha_mongodb.json test/integration/client-side-encryption",
"check:snappy": "mocha test/unit/assorted/snappy.test.js",
"fix:eslint": "npm run check:eslint -- --fix",
"prepare": "node etc/prepare.js",
"preview:docs": "ts-node etc/docs/preview.ts",
"release": "standard-version -i HISTORY.md",
Expand Down
71 changes: 48 additions & 23 deletions src/change_stream.ts
Expand Up @@ -45,10 +45,20 @@ const kClosed = Symbol('closed');
/** @internal */
const kMode = Symbol('mode');

const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument'];
const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat(
CHANGE_STREAM_OPTIONS
);
const CHANGE_STREAM_OPTIONS = [
'resumeAfter',
'startAfter',
'startAtOperationTime',
'fullDocument'
] as const;

const CURSOR_OPTIONS = [
'batchSize',
'maxAwaitTimeMS',
'collation',
'readPreference',
...CHANGE_STREAM_OPTIONS
] as const;

const CHANGE_DOMAIN_TYPES = {
COLLECTION: Symbol('Collection'),
Expand Down Expand Up @@ -94,7 +104,7 @@ export interface PipeOptions {
* @public
*/
export interface ChangeStreamOptions extends AggregateOptions {
/** Allowed values: ‘default’, ‘updateLookup’. When set to ‘updateLookup’, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred. */
/** Allowed values: ‘updateLookup’. When set to ‘updateLookup’, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred. */
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
fullDocument?: string;
/** The maximum amount of time for the server to wait on new documents to satisfy a change stream query. */
maxAwaitTimeMS?: number;
Expand Down Expand Up @@ -568,25 +578,50 @@ function setIsIterator<TSchema>(changeStream: ChangeStream<TSchema>): void {
}
changeStream[kMode] = 'iterator';
}

function createChangeStreamStageOptions<TSchema>(
changeStream: ChangeStream<TSchema>,
changeStreamOptions: ChangeStreamOptions
) {
const changeStreamStageOptions: Document = {
fullDocument: changeStreamOptions.fullDocument
};

for (const optionName of CHANGE_STREAM_OPTIONS) {
if (changeStreamOptions[optionName]) {
changeStreamStageOptions[optionName] = changeStreamOptions[optionName];
}
}

if (changeStream.type === CHANGE_DOMAIN_TYPES.CLUSTER) {
changeStreamStageOptions.allChangesForCluster = true;
}

return changeStreamStageOptions;
}
/**
* Create a new change stream cursor based on self's configuration
* @internal
*/
function createChangeStreamCursor<TSchema>(
changeStream: ChangeStream<TSchema>,
options: ChangeStreamOptions
changeStreamOptions: ChangeStreamOptions
): ChangeStreamCursor<TSchema> {
const changeStreamStageOptions: Document = { fullDocument: options.fullDocument || 'default' };
applyKnownOptions(changeStreamStageOptions, options, CHANGE_STREAM_OPTIONS);
if (changeStream.type === CHANGE_DOMAIN_TYPES.CLUSTER) {
changeStreamStageOptions.allChangesForCluster = true;
}

const changeStreamStageOptions = createChangeStreamStageOptions(
changeStream,
changeStreamOptions
);
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
const pipeline = [{ $changeStream: changeStreamStageOptions } as Document].concat(
changeStream.pipeline
);

const cursorOptions = applyKnownOptions({}, options, CURSOR_OPTIONS);
const cursorOptions: Document = {};
for (const optionName of CURSOR_OPTIONS) {
if (changeStreamOptions[optionName]) {
cursorOptions[optionName] = changeStreamOptions[optionName];
}
}

const changeStreamCursor = new ChangeStreamCursor<TSchema>(
getTopology(changeStream.parent),
changeStream.namespace,
Expand All @@ -605,16 +640,6 @@ function createChangeStreamCursor<TSchema>(
return changeStreamCursor;
}

function applyKnownOptions(target: Document, source: Document, optionNames: string[]) {
optionNames.forEach(name => {
if (source[name]) {
target[name] = source[name];
}
});

return target;
}

interface TopologyWaitOptions {
start?: number;
timeout?: number;
Expand Down
72 changes: 72 additions & 0 deletions test/integration/change-streams/change_stream.test.js
Expand Up @@ -187,6 +187,78 @@ describe('Change Streams', function () {
});
afterEach(async () => await mock.cleanup());

context('ChangeStreamCursor options', function () {
let client, db, collection;

beforeEach(async function () {
client = await this.configuration.newClient().connect();
db = client.db('db');
collection = db.collection('collection');
});

afterEach(async function () {
await client.close();
client = undefined;
db = undefined;
collection = undefined;
});

context('fullDocument', () => {
it('sets fullDocument to `undefined` if no value is passed', function () {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
const changeStream = client.watch();

expect(changeStream).to.have.nested.property(
'cursor.pipeline[0].$changeStream.fullDocument',
undefined
);
});

it('assigns `fullDocument` to the correct value if it is passed as an option', function () {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
const changeStream = client.watch([], { fullDocument: 'updateLookup' });

expect(changeStream).to.have.nested.property(
'cursor.pipeline[0].$changeStream.fullDocument',
'updateLookup'
);
});
});

context('allChangesForCluster', () => {
it('assigns `allChangesForCluster` to `true` if the ChangeStream.type is Cluster', function () {
const changeStream = client.watch();

expect(changeStream).to.have.nested.property(
'cursor.pipeline[0].$changeStream.allChangesForCluster',
true
);
});

it('does not assigns `allChangesForCluster` if the ChangeStream.type is Db', function () {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
const changeStream = db.watch();

expect(changeStream).not.to.have.nested.property(
'cursor.pipeline[0].$changeStream.allChangesForCluster'
);
});

it('does not assign `allChangesForCluster` if the ChangeStream.type is Db', function () {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
const changeStream = collection.watch();

expect(changeStream).not.to.have.nested.property(
'cursor.pipeline[0].$changeStream.allChangesForCluster'
);
});
});

it('ignores any invalid option values', function () {
const changeStream = collection.watch([], { invalidOption: true });

expect(changeStream).not.to.have.nested.property(
'cursor.pipeline[0].$changeStream.invalidOption'
);
});
});

it('should close the listeners after the cursor is closed', {
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } },

Expand Down
11 changes: 11 additions & 0 deletions test/types/change_streams.test-d.ts
@@ -0,0 +1,11 @@
import { expectError } from 'tsd';

import type { ChangeStreamOptions } from '../../src';

declare const changeStreamOptions: ChangeStreamOptions;

// TODO(NODE-4076)
// The types of `ChangeStreamOptions.fullDocument` should be strenghened to
// only allow the value `updateLookup` but this cannot be done until node v5.
// At that time, this test can be removed (or reworked if we think that's valuable).
expectError<'updateLookup' | undefined>(changeStreamOptions.fullDocument);