diff --git a/src/change_stream.ts b/src/change_stream.ts index 45deb2b97f..9e4ea69452 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -34,6 +34,8 @@ const kResumeQueue = Symbol('resumeQueue'); const kCursorStream = Symbol('cursorStream'); /** @internal */ const kClosed = Symbol('closed'); +/** @internal */ +const kMode = Symbol('mode'); const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument']; const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat( @@ -206,6 +208,8 @@ export class ChangeStream extends TypedEven [kCursorStream]?: Readable; /** @internal */ [kClosed]: boolean; + /** @internal */ + [kMode]: false | 'iterator' | 'emitter'; /** @event */ static readonly RESPONSE = 'response' as const; @@ -272,6 +276,7 @@ export class ChangeStream extends TypedEven this.cursor = createChangeStreamCursor(this, options); this[kClosed] = false; + this[kMode] = false; // Listen for any `change` listeners being added to ChangeStream this.on('newListener', eventName => { @@ -299,6 +304,7 @@ export class ChangeStream extends TypedEven /** Check if there is any document still available in the Change Stream */ hasNext(callback?: Callback): Promise | void { + setIsIterator(this); return maybePromise(callback, cb => { getCursor(this, (err, cursor) => { if (err || !cursor) return cb(err); // failed to resume, raise an error @@ -313,6 +319,7 @@ export class ChangeStream extends TypedEven next( callback?: Callback> ): Promise> | void { + setIsIterator(this); return maybePromise(callback, cb => { getCursor(this, (err, cursor) => { if (err || !cursor) return cb(err); // failed to resume, raise an error @@ -367,6 +374,7 @@ export class ChangeStream extends TypedEven tryNext(): Promise; tryNext(callback: Callback): void; tryNext(callback?: Callback): Promise | void { + setIsIterator(this); return maybePromise(callback, cb => { getCursor(this, (err, cursor) => { if (err || !cursor) return cb(err); // failed to resume, raise an error @@ -535,6 +543,23 @@ const CHANGE_STREAM_EVENTS = [ ChangeStream.CLOSE ]; +function setIsEmitter(changeStream: ChangeStream): void { + if (changeStream[kMode] === 'iterator') { + throw new MongoDriverError( + 'Cannot use ChangeStream as an EventEmitter after using as an iterator' + ); + } + changeStream[kMode] = 'emitter'; +} + +function setIsIterator(changeStream: ChangeStream): void { + if (changeStream[kMode] === 'emitter') { + throw new MongoDriverError( + 'Cannot use ChangeStream as iterator after using as an EventEmitter' + ); + } + changeStream[kMode] = 'iterator'; +} /** * Create a new change stream cursor based on self's configuration * @internal @@ -630,6 +655,7 @@ function streamEvents( changeStream: ChangeStream, cursor: ChangeStreamCursor ): void { + setIsEmitter(changeStream); const stream = changeStream[kCursorStream] || cursor.stream(); changeStream[kCursorStream] = stream; stream.on('data', change => processNewChange(changeStream, change)); diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 1ee60286e8..accb54c65c 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -1,7 +1,7 @@ 'use strict'; const assert = require('assert'); const { Transform, PassThrough } = require('stream'); -const { MongoNetworkError } = require('../../src/error'); +const { MongoNetworkError, MongoDriverError } = require('../../src/error'); const { delay, setupDatabase, withClient, withCursor } = require('./shared'); const co = require('co'); const mock = require('../tools/mock'); @@ -1792,6 +1792,111 @@ describe('Change Streams', function () { } }); + // FIXME: NODE-1797 + describe('should error when used as iterator and emitter concurrently', function () { + let client, coll, changeStream, repeatInsert, val; + val = 0; + + beforeEach(async function () { + client = this.configuration.newClient(); + await client.connect().catch(() => expect.fail('Failed to connect to client')); + + coll = client.db(this.configuration.db).collection('tester'); + changeStream = coll.watch(); + + repeatInsert = setInterval(async function () { + await coll.insertOne({ c: val }).catch('Failed to insert document'); + val++; + }, 75); + }); + + afterEach(async function () { + if (repeatInsert) { + clearInterval(repeatInsert); + } + if (changeStream) { + await changeStream.close(); + } + + await mock.cleanup(); + if (client) { + await client.close(); + } + }); + + it( + 'should throw MongoDriverError when set as an emitter with "on" and used as an iterator with "hasNext"', + { + metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, + test: async function () { + await new Promise(resolve => changeStream.on('change', resolve)); + try { + await changeStream.hasNext().catch(err => { + expect.fail(err.message); + }); + } catch (error) { + return expect(error).to.be.instanceof(MongoDriverError); + } + return expect.fail('Should not reach here'); + } + } + ); + + it( + 'should throw MongoDriverError when set as an iterator with "hasNext" and used as an emitter with "on"', + { + metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, + test: async function () { + await changeStream + .hasNext() + .catch(() => expect.fail('Failed to set changeStream to iterator')); + try { + await new Promise(resolve => changeStream.on('change', resolve)); + } catch (error) { + return expect(error).to.be.instanceof(MongoDriverError); + } + return expect.fail('Should not reach here'); + } + } + ); + + it( + 'should throw MongoDriverError when set as an emitter with "once" and used as an iterator with "next"', + { + metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, + test: async function () { + await new Promise(resolve => changeStream.once('change', resolve)); + try { + await changeStream.next().catch(err => { + expect.fail(err.message); + }); + } catch (error) { + return expect(error).to.be.instanceof(MongoDriverError); + } + return expect.fail('Should not reach here'); + } + } + ); + + it( + 'should throw MongoDriverError when set as an iterator with "tryNext" and used as an emitter with "on"', + { + metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, + test: async function () { + await changeStream + .tryNext() + .catch(() => expect.fail('Failed to set changeStream to iterator')); + try { + await new Promise(resolve => changeStream.on('change', resolve)); + } catch (error) { + return expect(error).to.be.instanceof(MongoDriverError); + } + return expect.fail('Should not reach here'); + } + } + ); + }); + describe('should properly handle a changeStream event being processed mid-close', function () { let client, coll, changeStream;