Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-4069): remove 'default' from options for fullDocument field in change stream options #3169

Merged
merged 9 commits into from Mar 16, 2022
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -122,6 +122,7 @@
"check:socks5": "mocha --config test/manual/mocharc.json test/manual/socks5.test.ts",
"check:csfle": "mocha --config test/mocha_mongodb.json test/integration/client-side-encryption",
"check:snappy": "mocha test/unit/assorted/snappy.test.js",
"eslint:fix": "npm run check:eslint -- --fix",
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
"prepare": "node etc/prepare.js",
"preview:docs": "ts-node etc/docs/preview.ts",
"release": "standard-version -i HISTORY.md",
Expand Down
33 changes: 22 additions & 11 deletions src/change_stream.ts
Expand Up @@ -46,9 +46,14 @@ const kClosed = Symbol('closed');
const kMode = Symbol('mode');

const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument'];
const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat(
CHANGE_STREAM_OPTIONS
);

const CURSOR_OPTIONS = [
'batchSize',
'maxAwaitTimeMS',
'collation',
'readPreference',
...CHANGE_STREAM_OPTIONS
];

const CHANGE_DOMAIN_TYPES = {
COLLECTION: Symbol('Collection'),
Expand Down Expand Up @@ -94,8 +99,8 @@ export interface PipeOptions {
* @public
*/
export interface ChangeStreamOptions extends AggregateOptions {
/** Allowed values: ‘default’, ‘updateLookup’. When set to ‘updateLookup’, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred. */
fullDocument?: string;
/** Allowed values: ‘updateLookup’. When set to ‘updateLookup’, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred. */
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
fullDocument?: 'updateLookup';
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
/** The maximum amount of time for the server to wait on new documents to satisfy a change stream query. */
maxAwaitTimeMS?: number;
/** Allows you to start a changeStream after a specified event. See {@link https://docs.mongodb.com/manual/changeStreams/#resumeafter-for-change-streams|ChangeStream documentation}. */
Expand Down Expand Up @@ -576,8 +581,14 @@ function createChangeStreamCursor<TSchema>(
changeStream: ChangeStream<TSchema>,
options: ChangeStreamOptions
): ChangeStreamCursor<TSchema> {
const changeStreamStageOptions: Document = { fullDocument: options.fullDocument || 'default' };
applyKnownOptions(changeStreamStageOptions, options, CHANGE_STREAM_OPTIONS);
const changeStreamStageOptions: Document = applyKnownOptions(
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
{
fullDocument: options.fullDocument
},
options,
CHANGE_STREAM_OPTIONS
);

if (changeStream.type === CHANGE_DOMAIN_TYPES.CLUSTER) {
changeStreamStageOptions.allChangesForCluster = true;
}
Expand Down Expand Up @@ -606,11 +617,11 @@ function createChangeStreamCursor<TSchema>(
}

function applyKnownOptions(target: Document, source: Document, optionNames: string[]) {
optionNames.forEach(name => {
if (source[name]) {
target[name] = source[name];
for (const option of optionNames) {
if (source[option]) {
target[option] = source[option];
}
});
}

return target;
}
Expand Down
2 changes: 1 addition & 1 deletion src/index.ts
Expand Up @@ -166,7 +166,6 @@ export type {
export type { OrderedBulkOperation } from './bulk/ordered';
export type { UnorderedBulkOperation } from './bulk/unordered';
export type {
ChangeStream,
ChangeStreamCursor,
ChangeStreamCursorOptions,
ChangeStreamDocument,
Expand All @@ -178,6 +177,7 @@ export type {
ResumeToken,
UpdateDescription
} from './change_stream';
export { ChangeStream } from './change_stream';
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
export type {
AuthMechanismProperties,
MongoCredentials,
Expand Down
72 changes: 71 additions & 1 deletion test/integration/change-streams/change_stream.test.js
Expand Up @@ -7,7 +7,7 @@ const { EventCollector, getSymbolFrom } = require('../../tools/utils');
const { expect } = require('chai');

const sinon = require('sinon');
const { Long, ReadPreference, MongoNetworkError } = require('../../../src');
const { Long, ReadPreference, MongoNetworkError, ChangeStream } = require('../../../src');

const crypto = require('crypto');
const { isHello } = require('../../../src/utils');
Expand Down Expand Up @@ -187,6 +187,76 @@ describe('Change Streams', function () {
});
afterEach(async () => await mock.cleanup());

context('ChangeStreamCursor options', function () {
let client, db, collection;

beforeEach(async function () {
client = this.configuration.newClient();
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
await client.connect();
db = client.db('db');
collection = db.collection('collection');
});

afterEach(async function () {
await client.close();
client = undefined;
db = undefined;
collection = undefined;
});

context('fullDocument', () => {
it('sets fullDocument to `undefined` if no value is passed', function () {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
const changeStream = new ChangeStream(client);
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved

expect(changeStream.cursor).to.haveOwnProperty('pipeline');
const pipelineOptions = changeStream.cursor.pipeline[0].$changeStream;
expect(pipelineOptions).to.haveOwnProperty('fullDocument').to.be.undefined;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests look good to me as is, so no need to change, but I've tried to get in the habbit of using the nested feature when splunking deep into an object, at least when there are questions of nullishness

expect(changeStream).to.have.nested.property('cursor.pipeline[0].$changeStream.fullDocument', undefined);

If anything in the chain is nullish we crash without a useful assertion printout, YMMV depending, maybe this case is overkill.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with this approach. The only concern I have is that when checking for undefined, any typo in the string will cause a false negative in the test

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't be the case, I tried the following:

expect({a: {}}).to.have.nested.property('a.b', undefined); // throws
expect({a: {b: undefined}}).to.have.nested.property('a.b', undefined); // passes

so actually I think two of your tests need to be changed to say "expect to NOT have nested property" distinct from a defined property that is undefined

});

it('assigns `fullDocument` to the correct value if it is passed as an option', function () {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
const changeStream = new ChangeStream(client, [], { fullDocument: 'updateLookup' });

expect(changeStream.cursor).to.haveOwnProperty('pipeline');
const pipelineOptions = changeStream.cursor.pipeline[0].$changeStream;
expect(pipelineOptions).to.haveOwnProperty('fullDocument').to.equal('updateLookup');
});
});

context('allChangesForCluster', () => {
it('assigns `allChangesForCluster` to `true` if the ChangeStream.type is Cluster', function () {
const changeStream = new ChangeStream(client);

expect(changeStream.cursor).to.haveOwnProperty('pipeline');
const pipelineOptions = changeStream.cursor.pipeline[0].$changeStream;
expect(pipelineOptions).to.haveOwnProperty('allChangesForCluster').to.be.true;
});

it('does not assigns `allChangesForCluster` if the ChangeStream.type is Db', function () {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
const changeStream = new ChangeStream(db);

expect(changeStream.cursor).to.haveOwnProperty('pipeline');

const pipelineOptions = changeStream.cursor.pipeline[0].$changeStream;
expect(pipelineOptions).not.to.haveOwnProperty('allChangesForCluster');
});

it('does not assigns `allChangesForCluster` if the ChangeStream.type is Db', function () {
const changeStream = new ChangeStream(collection);

expect(changeStream.cursor).to.haveOwnProperty('pipeline');
const pipelineOptions = changeStream.cursor.pipeline[0].$changeStream;
expect(pipelineOptions).not.to.haveOwnProperty('allChangesForCluster');
});
});

it('ignores any invalid option values', function () {
const changeStream = new ChangeStream(collection, [], { invalidOption: true });
expect(changeStream.cursor).to.haveOwnProperty('pipeline');
const pipelineOptions = changeStream.cursor.pipeline[0].$changeStream;
expect(pipelineOptions).not.to.haveOwnProperty('invalidOption');
});
});

it('should close the listeners after the cursor is closed', {
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } },

Expand Down