From b2798d939ee9ed7a5ff9a63980ad6a29ddea3cf1 Mon Sep 17 00:00:00 2001 From: Kwabena Ampofo Date: Mon, 23 May 2022 09:54:51 -0400 Subject: [PATCH] fix(NODE-4103): respect BSON options when creating change streams (#3247) --- src/change_stream.ts | 4 +- .../change-streams/change_stream.test.ts | 123 +++++++++++++++++- 2 files changed, 123 insertions(+), 4 deletions(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index 4997206df0..b3788ec0f4 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -647,8 +647,6 @@ export class ChangeStream< } const pipeline = [{ $changeStream: changeStreamStageOptions }, ...this.pipeline]; - const cursorOptions: ChangeStreamCursorOptions = filterOptions(options, CURSOR_OPTIONS); - const client: MongoClient | null = this.type === CHANGE_DOMAIN_TYPES.CLUSTER ? (this.parent as MongoClient) @@ -669,7 +667,7 @@ export class ChangeStream< client, this.namespace, pipeline, - cursorOptions + options ); for (const event of CHANGE_STREAM_EVENTS) { diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 139c0680ed..8dc15156bc 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1,6 +1,7 @@ import { strict as assert } from 'assert'; import { expect } from 'chai'; import * as crypto from 'crypto'; +import { once } from 'events'; import * as sinon from 'sinon'; import { PassThrough, Transform } from 'stream'; @@ -8,6 +9,7 @@ import { ChangeStream, ChangeStreamOptions, Collection, + Db, Long, MongoClient, MongoNetworkError, @@ -22,7 +24,7 @@ import { TestBuilder, UnifiedTestSuiteBuilder } from '../../tools/utils'; -import { delay, setupDatabase, withClient, withCursor } from '../shared'; +import { delay, filterForCommands, setupDatabase, withClient, withCursor } from '../shared'; function withChangeStream( callback: (collection: Collection, changeStream: ChangeStream, done: Mocha.Done) => void @@ -1990,4 +1992,123 @@ describe('Change Streams', function () { .toJSON() ) .run(); + + describe('BSON Options', function () { + let client: MongoClient; + let db: Db; + let collection: Collection; + let cs: ChangeStream; + beforeEach(async function () { + client = await this.configuration.newClient({ monitorCommands: true }).connect(); + db = client.db('db'); + collection = await db.createCollection('collection'); + }); + afterEach(async function () { + await db.dropCollection('collection'); + await cs.close(); + await client.close(); + client = undefined; + db = undefined; + collection = undefined; + }); + + context('promoteLongs', () => { + context('when set to true', () => { + it('does not convert Longs to numbers', { + metadata: { requires: { topology: '!single' } }, + test: async function () { + cs = collection.watch([], { promoteLongs: true }); + + const willBeChange = once(cs, 'change').then(args => args[0]); + await once(cs.cursor, 'init'); + + const result = await collection.insertOne({ a: Long.fromNumber(0) }); + expect(result).to.exist; + + const change = await willBeChange; + + expect(typeof change.fullDocument.a).to.equal('number'); + } + }); + }); + + context('when set to false', () => { + it('converts Long values to native numbers', { + metadata: { requires: { topology: '!single' } }, + test: async function () { + cs = collection.watch([], { promoteLongs: false }); + + const willBeChange = once(cs, 'change').then(args => args[0]); + await once(cs.cursor, 'init'); + + const result = await collection.insertOne({ a: Long.fromNumber(0) }); + expect(result).to.exist; + + const change = await willBeChange; + expect(change).to.have.nested.property('fullDocument.a').that.is.instanceOf(Long); + } + }); + }); + + context('when omitted', () => { + it('defaults to true', { + metadata: { requires: { topology: '!single' } }, + test: async function () { + cs = collection.watch([]); + + const willBeChange = once(cs, 'change').then(args => args[0]); + await once(cs.cursor, 'init'); + + const result = await collection.insertOne({ a: Long.fromNumber(0) }); + expect(result).to.exist; + + const change = await willBeChange; + expect(typeof change.fullDocument.a).to.equal('number'); + } + }); + }); + }); + + context('invalid options', function () { + it('does not send invalid options on the aggregate command', { + metadata: { requires: { topology: '!single' } }, + test: async function () { + const started = []; + + client.on('commandStarted', filterForCommands(['aggregate'], started)); + const doc = { invalidBSONOption: true }; + cs = collection.watch([], doc); + + const willBeChange = once(cs, 'change').then(args => args[0]); + await once(cs.cursor, 'init'); + + const result = await collection.insertOne({ a: Long.fromNumber(0) }); + expect(result).to.exist; + + await willBeChange; + expect(started[0].command).not.to.haveOwnProperty('invalidBSONOption'); + } + }); + + it('does not send invalid options on the getMore command', { + metadata: { requires: { topology: '!single' } }, + test: async function () { + const started = []; + + client.on('commandStarted', filterForCommands(['aggregate'], started)); + const doc = { invalidBSONOption: true }; + cs = collection.watch([], doc); + + const willBeChange = once(cs, 'change').then(args => args[0]); + await once(cs.cursor, 'init'); + + const result = await collection.insertOne({ a: Long.fromNumber(0) }); + expect(result).to.exist; + + await willBeChange; + expect(started[0].command).not.to.haveOwnProperty('invalidBSONOption'); + } + }); + }); + }); });