Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-4103): respect BSON options when creating change streams #3247

Merged
merged 9 commits into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -647,8 +647,6 @@ export class ChangeStream<
}
const pipeline = [{ $changeStream: changeStreamStageOptions }, ...this.pipeline];

const cursorOptions: ChangeStreamCursorOptions = filterOptions(options, CURSOR_OPTIONS);

const client: MongoClient | null =
this.type === CHANGE_DOMAIN_TYPES.CLUSTER
? (this.parent as MongoClient)
Expand All @@ -669,7 +667,7 @@ export class ChangeStream<
client,
this.namespace,
pipeline,
cursorOptions
options
);

for (const event of CHANGE_STREAM_EVENTS) {
Expand Down
123 changes: 122 additions & 1 deletion test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import { strict as assert } from 'assert';
import { expect } from 'chai';
import * as crypto from 'crypto';
import { once } from 'events';
import * as sinon from 'sinon';
import { PassThrough, Transform } from 'stream';

import {
ChangeStream,
ChangeStreamOptions,
Collection,
Db,
Long,
MongoClient,
MongoNetworkError,
Expand All @@ -22,7 +24,7 @@ import {
TestBuilder,
UnifiedTestSuiteBuilder
} from '../../tools/utils';
import { delay, setupDatabase, withClient, withCursor } from '../shared';
import { delay, filterForCommands, setupDatabase, withClient, withCursor } from '../shared';

function withChangeStream(
callback: (collection: Collection, changeStream: ChangeStream, done: Mocha.Done) => void
Expand Down Expand Up @@ -1990,4 +1992,123 @@ describe('Change Streams', function () {
.toJSON()
)
.run();

describe('BSON Options', function () {
let client: MongoClient;
let db: Db;
let collection: Collection;
let cs: ChangeStream;
beforeEach(async function () {
client = await this.configuration.newClient({ monitorCommands: true }).connect();
db = client.db('db');
collection = await db.createCollection('collection');
});
afterEach(async function () {
await db.dropCollection('collection');
await cs.close();
await client.close();
client = undefined;
db = undefined;
collection = undefined;
});

context('promoteLongs', () => {
context('when set to true', () => {
it('does not convert Longs to numbers', {
metadata: { requires: { topology: '!single' } },
test: async function () {
cs = collection.watch([], { promoteLongs: true });

const willBeChange = once(cs, 'change').then(args => args[0]);
await once(cs.cursor, 'init');

const result = await collection.insertOne({ a: Long.fromNumber(0) });
expect(result).to.exist;

const change = await willBeChange;

expect(typeof change.fullDocument.a).to.equal('number');
}
});
});

context('when set to false', () => {
it('converts Long values to native numbers', {
metadata: { requires: { topology: '!single' } },
test: async function () {
cs = collection.watch([], { promoteLongs: false });

const willBeChange = once(cs, 'change').then(args => args[0]);
await once(cs.cursor, 'init');

const result = await collection.insertOne({ a: Long.fromNumber(0) });
expect(result).to.exist;

const change = await willBeChange;
expect(change).to.have.nested.property('fullDocument.a').that.is.instanceOf(Long);
}
});
});

context('when omitted', () => {
it('defaults to true', {
metadata: { requires: { topology: '!single' } },
test: async function () {
cs = collection.watch([]);

const willBeChange = once(cs, 'change').then(args => args[0]);
await once(cs.cursor, 'init');

const result = await collection.insertOne({ a: Long.fromNumber(0) });
expect(result).to.exist;

const change = await willBeChange;
expect(typeof change.fullDocument.a).to.equal('number');
}
});
});
});

context('invalid options', function () {
it('does not send invalid options on the aggregate command', {
metadata: { requires: { topology: '!single' } },
test: async function () {
const started = [];

client.on('commandStarted', filterForCommands(['aggregate'], started));
const doc = { invalidBSONOption: true };
cs = collection.watch([], doc);

const willBeChange = once(cs, 'change').then(args => args[0]);
await once(cs.cursor, 'init');

const result = await collection.insertOne({ a: Long.fromNumber(0) });
expect(result).to.exist;

await willBeChange;
expect(started[0].command).not.to.haveOwnProperty('invalidBSONOption');
}
});

it('does not send invalid options on the getMore command', {
metadata: { requires: { topology: '!single' } },
test: async function () {
const started = [];

client.on('commandStarted', filterForCommands(['aggregate'], started));
const doc = { invalidBSONOption: true };
cs = collection.watch([], doc);

const willBeChange = once(cs, 'change').then(args => args[0]);
await once(cs.cursor, 'init');

const result = await collection.insertOne({ a: Long.fromNumber(0) });
expect(result).to.exist;

await willBeChange;
expect(started[0].command).not.to.haveOwnProperty('invalidBSONOption');
}
});
});
});
});