Skip to content

Commit

Permalink
fix(NODE-4103): respect BSON options when creating change streams (#3247
Browse files Browse the repository at this point in the history
)
  • Loading branch information
Kwabena Ampofo committed May 23, 2022
1 parent 1261432 commit b2798d9
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 4 deletions.
4 changes: 1 addition & 3 deletions src/change_stream.ts
Expand Up @@ -647,8 +647,6 @@ export class ChangeStream<
}
const pipeline = [{ $changeStream: changeStreamStageOptions }, ...this.pipeline];

const cursorOptions: ChangeStreamCursorOptions = filterOptions(options, CURSOR_OPTIONS);

const client: MongoClient | null =
this.type === CHANGE_DOMAIN_TYPES.CLUSTER
? (this.parent as MongoClient)
Expand All @@ -669,7 +667,7 @@ export class ChangeStream<
client,
this.namespace,
pipeline,
cursorOptions
options
);

for (const event of CHANGE_STREAM_EVENTS) {
Expand Down
123 changes: 122 additions & 1 deletion test/integration/change-streams/change_stream.test.ts
@@ -1,13 +1,15 @@
import { strict as assert } from 'assert';
import { expect } from 'chai';
import * as crypto from 'crypto';
import { once } from 'events';
import * as sinon from 'sinon';
import { PassThrough, Transform } from 'stream';

import {
ChangeStream,
ChangeStreamOptions,
Collection,
Db,
Long,
MongoClient,
MongoNetworkError,
Expand All @@ -22,7 +24,7 @@ import {
TestBuilder,
UnifiedTestSuiteBuilder
} from '../../tools/utils';
import { delay, setupDatabase, withClient, withCursor } from '../shared';
import { delay, filterForCommands, setupDatabase, withClient, withCursor } from '../shared';

function withChangeStream(
callback: (collection: Collection, changeStream: ChangeStream, done: Mocha.Done) => void
Expand Down Expand Up @@ -1990,4 +1992,123 @@ describe('Change Streams', function () {
.toJSON()
)
.run();

describe('BSON Options', function () {
let client: MongoClient;
let db: Db;
let collection: Collection;
let cs: ChangeStream;
beforeEach(async function () {
client = await this.configuration.newClient({ monitorCommands: true }).connect();
db = client.db('db');
collection = await db.createCollection('collection');
});
afterEach(async function () {
await db.dropCollection('collection');
await cs.close();
await client.close();
client = undefined;
db = undefined;
collection = undefined;
});

context('promoteLongs', () => {
context('when set to true', () => {
it('does not convert Longs to numbers', {
metadata: { requires: { topology: '!single' } },
test: async function () {
cs = collection.watch([], { promoteLongs: true });

const willBeChange = once(cs, 'change').then(args => args[0]);
await once(cs.cursor, 'init');

const result = await collection.insertOne({ a: Long.fromNumber(0) });
expect(result).to.exist;

const change = await willBeChange;

expect(typeof change.fullDocument.a).to.equal('number');
}
});
});

context('when set to false', () => {
it('converts Long values to native numbers', {
metadata: { requires: { topology: '!single' } },
test: async function () {
cs = collection.watch([], { promoteLongs: false });

const willBeChange = once(cs, 'change').then(args => args[0]);
await once(cs.cursor, 'init');

const result = await collection.insertOne({ a: Long.fromNumber(0) });
expect(result).to.exist;

const change = await willBeChange;
expect(change).to.have.nested.property('fullDocument.a').that.is.instanceOf(Long);
}
});
});

context('when omitted', () => {
it('defaults to true', {
metadata: { requires: { topology: '!single' } },
test: async function () {
cs = collection.watch([]);

const willBeChange = once(cs, 'change').then(args => args[0]);
await once(cs.cursor, 'init');

const result = await collection.insertOne({ a: Long.fromNumber(0) });
expect(result).to.exist;

const change = await willBeChange;
expect(typeof change.fullDocument.a).to.equal('number');
}
});
});
});

context('invalid options', function () {
it('does not send invalid options on the aggregate command', {
metadata: { requires: { topology: '!single' } },
test: async function () {
const started = [];

client.on('commandStarted', filterForCommands(['aggregate'], started));
const doc = { invalidBSONOption: true };
cs = collection.watch([], doc);

const willBeChange = once(cs, 'change').then(args => args[0]);
await once(cs.cursor, 'init');

const result = await collection.insertOne({ a: Long.fromNumber(0) });
expect(result).to.exist;

await willBeChange;
expect(started[0].command).not.to.haveOwnProperty('invalidBSONOption');
}
});

it('does not send invalid options on the getMore command', {
metadata: { requires: { topology: '!single' } },
test: async function () {
const started = [];

client.on('commandStarted', filterForCommands(['aggregate'], started));
const doc = { invalidBSONOption: true };
cs = collection.watch([], doc);

const willBeChange = once(cs, 'change').then(args => args[0]);
await once(cs.cursor, 'init');

const result = await collection.insertOne({ a: Long.fromNumber(0) });
expect(result).to.exist;

await willBeChange;
expect(started[0].command).not.to.haveOwnProperty('invalidBSONOption');
}
});
});
});
});

0 comments on commit b2798d9

Please sign in to comment.