Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-1797): Error when ChangeStream used as iterator and emitter concurrently #2871

Merged
26 changes: 26 additions & 0 deletions src/change_stream.ts
Expand Up @@ -34,6 +34,8 @@ const kResumeQueue = Symbol('resumeQueue');
const kCursorStream = Symbol('cursorStream');
/** @internal */
const kClosed = Symbol('closed');
/** @internal */
const kMode = Symbol('mode');

const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument'];
const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat(
Expand Down Expand Up @@ -204,6 +206,8 @@ export class ChangeStream<TSchema extends Document> extends TypedEventEmitter<Ch
[kCursorStream]?: Readable;
/** @internal */
[kClosed]: boolean;
/** @internal */
[kMode]: false | 'iterator' | 'emitter';
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved

/** @event */
static readonly RESPONSE = 'response' as const;
Expand Down Expand Up @@ -270,6 +274,7 @@ export class ChangeStream<TSchema extends Document> extends TypedEventEmitter<Ch
this.cursor = createChangeStreamCursor(this, options);

this[kClosed] = false;
this[kMode] = false;

// Listen for any `change` listeners being added to ChangeStream
this.on('newListener', eventName => {
Expand Down Expand Up @@ -297,6 +302,7 @@ export class ChangeStream<TSchema extends Document> extends TypedEventEmitter<Ch

/** Check if there is any document still available in the Change Stream */
hasNext(callback?: Callback): Promise<void> | void {
setIsIterator(this);
return maybePromise(callback, cb => {
getCursor(this, (err, cursor) => {
if (err || !cursor) return cb(err); // failed to resume, raise an error
Expand All @@ -311,6 +317,7 @@ export class ChangeStream<TSchema extends Document> extends TypedEventEmitter<Ch
next(
callback?: Callback<ChangeStreamDocument<TSchema>>
): Promise<ChangeStreamDocument<TSchema>> | void {
setIsIterator(this);
return maybePromise(callback, cb => {
getCursor(this, (err, cursor) => {
if (err || !cursor) return cb(err); // failed to resume, raise an error
Expand Down Expand Up @@ -365,6 +372,7 @@ export class ChangeStream<TSchema extends Document> extends TypedEventEmitter<Ch
tryNext(): Promise<Document | null>;
tryNext(callback: Callback<Document | null>): void;
tryNext(callback?: Callback<Document | null>): Promise<Document | null> | void {
setIsIterator(this);
return maybePromise(callback, cb => {
getCursor(this, (err, cursor) => {
if (err || !cursor) return cb(err); // failed to resume, raise an error
Expand Down Expand Up @@ -533,6 +541,23 @@ const CHANGE_STREAM_EVENTS = [
ChangeStream.CLOSE
];

function setIsEmitter<TSchema>(changeStream: ChangeStream<TSchema>): void {
if (changeStream[kMode] === 'iterator') {
throw new MongoDriverError(
'Cannot use ChangeStream as an EventEmitter after using as an iterator'
);
}
changeStream[kMode] = 'emitter';
}

function setIsIterator<TSchema>(changeStream: ChangeStream<TSchema>): void {
if (changeStream[kMode] === 'emitter') {
throw new MongoDriverError(
'Cannot use ChangeStream as iterator after using as an EventEmitter'
);
}
changeStream[kMode] = 'iterator';
}
/**
* Create a new change stream cursor based on self's configuration
* @internal
Expand Down Expand Up @@ -628,6 +653,7 @@ function streamEvents<TSchema>(
changeStream: ChangeStream<TSchema>,
cursor: ChangeStreamCursor<TSchema>
): void {
setIsEmitter(changeStream);
const stream = changeStream[kCursorStream] || cursor.stream();
changeStream[kCursorStream] = stream;
stream.on('data', change => processNewChange(changeStream, change));
Expand Down
107 changes: 106 additions & 1 deletion test/functional/change_stream.test.js
@@ -1,7 +1,7 @@
'use strict';
const assert = require('assert');
const { Transform, PassThrough } = require('stream');
const { MongoNetworkError } = require('../../src/error');
const { MongoNetworkError, MongoDriverError } = require('../../src/error');
const { delay, setupDatabase, withClient, withCursor } = require('./shared');
const co = require('co');
const mock = require('../tools/mock');
Expand Down Expand Up @@ -1792,6 +1792,111 @@ describe('Change Streams', function () {
}
});

// FIXME: NODE-1797
describe('should error when used as iterator and emitter concurrently', function () {
let client, coll, changeStream, repeatInsert, val;
val = 0;

beforeEach(async function () {
client = this.configuration.newClient();
await client.connect().catch(() => expect.fail('Failed to connect to client'));

coll = client.db(this.configuration.db).collection('tester');
changeStream = coll.watch();

repeatInsert = setInterval(async function () {
await coll.insertOne({ c: val }).catch('Failed to insert document');
val++;
}, 75);
});

afterEach(async function () {
if (repeatInsert) {
clearInterval(repeatInsert);
}
if (changeStream) {
await changeStream.close();
}

await mock.cleanup();
if (client) {
await client.close();
}
});

it(
'should throw MongoDriverError when set as an emitter with "on" and used as an iterator with "hasNext"',
{
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } },
test: async function () {
await new Promise(resolve => changeStream.on('change', resolve));
try {
await changeStream.hasNext().catch(err => {
expect.fail(err.message);
});
} catch (error) {
return expect(error).to.be.instanceof(MongoDriverError);
}
return expect.fail('Should not reach here');
}
}
);

it(
'should throw MongoDriverError when set as an iterator with "hasNext" and used as an emitter with "on"',
{
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } },
test: async function () {
await changeStream
.hasNext()
.catch(() => expect.fail('Failed to set changeStream to iterator'));
try {
await new Promise(resolve => changeStream.on('change', resolve));
} catch (error) {
return expect(error).to.be.instanceof(MongoDriverError);
}
return expect.fail('Should not reach here');
}
}
);

it(
'should throw MongoDriverError when set as an emitter with "once" and used as an iterator with "next"',
{
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } },
test: async function () {
await new Promise(resolve => changeStream.once('change', resolve));
try {
await changeStream.next().catch(err => {
expect.fail(err.message);
});
} catch (error) {
return expect(error).to.be.instanceof(MongoDriverError);
}
return expect.fail('Should not reach here');
}
}
);

it(
'should throw MongoDriverError when set as an iterator with "tryNext" and used as an emitter with "on"',
{
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } },
test: async function () {
await changeStream
.tryNext()
.catch(() => expect.fail('Failed to set changeStream to iterator'));
try {
await new Promise(resolve => changeStream.on('change', resolve));
} catch (error) {
return expect(error).to.be.instanceof(MongoDriverError);
}
return expect.fail('Should not reach here');
}
}
);
});

describe('should properly handle a changeStream event being processed mid-close', function () {
let client, coll, changeStream;

Expand Down