Skip to content

Commit

Permalink
fix(NODE-4735): fix change stream consecutive resumabilty (#3453)
Browse files Browse the repository at this point in the history
  • Loading branch information
baileympearson committed Oct 26, 2022
1 parent d55022b commit 89b27e9
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 87 deletions.
67 changes: 39 additions & 28 deletions src/change_stream.ts
Expand Up @@ -648,21 +648,25 @@ export class ChangeStream<
hasNext(callback?: Callback): Promise<boolean> | void {
this._setIsIterator();
return maybeCallback(async () => {
try {
const hasNext = await this.cursor.hasNext();
return hasNext;
} catch (error) {
// Change streams must resume indefinitely while each resume event succeeds.
// This loop continues until either a change event is received or until a resume attempt
// fails.
// eslint-disable-next-line no-constant-condition
while (true) {
try {
await this._processErrorIteratorMode(error);
const hasNext = await this.cursor.hasNext();
return hasNext;
} catch (error) {
try {
await this.close();
} catch {
// We are not concerned with errors from close()
await this._processErrorIteratorMode(error);
} catch (error) {
try {
await this.close();
} catch {
// We are not concerned with errors from close()
}
throw error;
}
throw error;
}
}
}, callback);
Expand All @@ -675,23 +679,26 @@ export class ChangeStream<
next(callback?: Callback<TChange>): Promise<TChange> | void {
this._setIsIterator();
return maybeCallback(async () => {
try {
const change = await this.cursor.next();
const processedChange = this._processChange(change ?? null);
return processedChange;
} catch (error) {
// Change streams must resume indefinitely while each resume event succeeds.
// This loop continues until either a change event is received or until a resume attempt
// fails.
// eslint-disable-next-line no-constant-condition
while (true) {
try {
await this._processErrorIteratorMode(error);
const change = await this.cursor.next();
const processedChange = this._processChange(change ?? null);
return processedChange;
} catch (error) {
try {
await this.close();
} catch {
// We are not concerned with errors from close()
await this._processErrorIteratorMode(error);
} catch (error) {
try {
await this.close();
} catch {
// We are not concerned with errors from close()
}
throw error;
}
throw error;
}
}
}, callback);
Expand All @@ -706,21 +713,25 @@ export class ChangeStream<
tryNext(callback?: Callback<Document | null>): Promise<Document | null> | void {
this._setIsIterator();
return maybeCallback(async () => {
try {
const change = await this.cursor.tryNext();
return change ?? null;
} catch (error) {
// Change streams must resume indefinitely while each resume event succeeds.
// This loop continues until either a change event is received or until a resume attempt
// fails.
// eslint-disable-next-line no-constant-condition
while (true) {
try {
await this._processErrorIteratorMode(error);
const change = await this.cursor.tryNext();
return change ?? null;
} catch (error) {
try {
await this.close();
} catch {
// We are not concerned with errors from close()
await this._processErrorIteratorMode(error);
} catch (error) {
try {
await this.close();
} catch {
// We are not concerned with errors from close()
}
throw error;
}
throw error;
}
}
}, callback);
Expand Down
197 changes: 147 additions & 50 deletions test/integration/change-streams/change_stream.test.ts
Expand Up @@ -10,6 +10,7 @@ import { promisify } from 'util';
import {
AbstractCursor,
ChangeStream,
ChangeStreamDocument,
ChangeStreamOptions,
Collection,
CommandStartedEvent,
Expand Down Expand Up @@ -1037,56 +1038,6 @@ describe('Change Streams', function () {
});

describe('Change Stream Resume Error Tests', function () {
describe('TODO(NODE-4670): fix consecutive resumes unified tests', function () {
let client: MongoClient;
let changeStream: ChangeStream;

beforeEach(async function () {
client = this.configuration.newClient();
await client.connect();
});

afterEach(async function () {
await changeStream.close();
await client.close();
});

it('should support consecutive resumes', {
metadata: { requires: { topology: 'replicaset', mongodb: '>=4.2' } },
async test() {
const failCommand: FailPoint = {
configureFailPoint: 'failCommand',
mode: {
times: 2
},
data: {
failCommands: ['getMore'],
closeConnection: true
}
};

await client.db('admin').command(failCommand);

const collection = client.db('test_consecutive_resume').collection('collection');

changeStream = collection.watch([], { batchSize: 1 });

await initIteratorMode(changeStream);

await collection.insertOne({ name: 'bumpy' });
await collection.insertOne({ name: 'bumpy' });
await collection.insertOne({ name: 'bumpy' });

await sleep(1000);

for (let i = 0; i < 3; ++i) {
const change = await changeStream.next();
expect(change).not.to.be.null;
}
}
});
});

it.skip('should continue piping changes after a resumable error', {
metadata: { requires: { topology: 'replicaset' } },
test: done => {
Expand Down Expand Up @@ -1767,7 +1718,44 @@ describe('ChangeStream resumability', function () {
expect(aggregateEvents).to.have.lengthOf(2);
}
);

it(
`supports consecutive resumes on error code ${code} ${error}`,
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
changeStream = collection.watch([]);
await initIteratorMode(changeStream);

await client.db('admin').command({
configureFailPoint: is4_2Server(this.configuration.version)
? 'failCommand'
: 'failGetMoreAfterCursorCheckout',
mode: { times: 5 },
data: {
failCommands: ['getMore'],
errorCode: code,
errmsg: message
}
} as FailPoint);

// There's an inherent race condition here because we need to make sure that the `aggregates` that succeed when
// resuming a change stream don't return the change event. So we defer the insert until a period of time
// after the change stream has started listening for a change. 2000ms is long enough for the change
// stream to attempt to resume and fail multiple times before exhausting the failpoint and succeeding.
const [, value] = await Promise.allSettled([
sleep(2000).then(() => collection.insertOne({ name: 'bailey' })),
changeStream.next()
]);

const change = (value as PromiseFulfilledResult<ChangeStreamDocument>).value;

expect(change).to.have.property('operationType', 'insert');

expect(aggregateEvents).to.have.lengthOf(6);
}
);
}

for (const { error, code, message } of resumableErrorCodes) {
it(
`resumes on error code ${code} (${error})`,
Expand Down Expand Up @@ -1896,6 +1884,42 @@ describe('ChangeStream resumability', function () {
expect(aggregateEvents).to.have.lengthOf(2);
}
);

it(
`supports consecutive resumes on error code ${code} ${error}`,
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
changeStream = collection.watch([]);
await initIteratorMode(changeStream);

await client.db('admin').command({
configureFailPoint: is4_2Server(this.configuration.version)
? 'failCommand'
: 'failGetMoreAfterCursorCheckout',
mode: { times: 5 },
data: {
failCommands: ['getMore'],
errorCode: code,
errmsg: message
}
} as FailPoint);

// There's an inherent race condition here because we need to make sure that the `aggregates` that succeed when
// resuming a change stream don't return the change event. So we defer the insert until a period of time
// after the change stream has started listening for a change. 2000ms is long enough for the change
// stream to attempt to resume and fail multiple times before exhausting the failpoint and succeeding.
const [, value] = await Promise.allSettled([
sleep(2000).then(() => collection.insertOne({ name: 'bailey' })),
changeStream.hasNext()
]);

const change = (value as PromiseFulfilledResult<boolean>).value;

expect(change).to.be.true;

expect(aggregateEvents).to.have.lengthOf(6);
}
);
}

for (const { error, code, message } of resumableErrorCodes) {
Expand Down Expand Up @@ -2033,6 +2057,42 @@ describe('ChangeStream resumability', function () {
expect(aggregateEvents).to.have.lengthOf(2);
}
);

it(
`supports consecutive resumes on error code ${code} ${error}`,
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
changeStream = collection.watch([]);
await initIteratorMode(changeStream);

await client.db('admin').command({
configureFailPoint: is4_2Server(this.configuration.version)
? 'failCommand'
: 'failGetMoreAfterCursorCheckout',
mode: { times: 5 },
data: {
failCommands: ['getMore'],
errorCode: code,
errmsg: message
}
} as FailPoint);

try {
// tryNext is not blocking and on sharded clusters we don't have control of when
// the actual change event will be ready on the change stream pipeline. This introduces
// a race condition, where sometimes we receive the change event and sometimes
// we don't when we call tryNext, depending on the timing of the sharded cluster.

// Since we really only care about the resumability, it's enough for this test to throw
// if tryNext ever throws and assert on the number of aggregate events.
await changeStream.tryNext();
} catch (err) {
expect.fail(`expected tryNext to resume, received error instead: ${err}`);
}

expect(aggregateEvents).to.have.lengthOf(6);
}
);
}

for (const { error, code, message } of resumableErrorCodes) {
Expand Down Expand Up @@ -2171,6 +2231,43 @@ describe('ChangeStream resumability', function () {
expect(aggregateEvents).to.have.lengthOf(2);
}
);

it(
`supports consecutive resumes on error code ${code} (${error})`,
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
changeStream = collection.watch([]);

await client.db('admin').command({
configureFailPoint: is4_2Server(this.configuration.version)
? 'failCommand'
: 'failGetMoreAfterCursorCheckout',
mode: { times: 5 },
data: {
failCommands: ['getMore'],
errorCode: code,
errmsg: message
}
} as FailPoint);

const changes = once(changeStream, 'change');
await once(changeStream.cursor, 'init');

// There's an inherent race condition here because we need to make sure that the `aggregates` that succeed when
// resuming a change stream don't return the change event. So we defer the insert until a period of time
// after the change stream has started listening for a change. 2000ms is long enough for the change
// stream to attempt to resume and fail multiple times before exhausting the failpoint and succeeding.
const [, value] = await Promise.allSettled([
sleep(2000).then(() => collection.insertOne({ name: 'bailey' })),
changes
]);

const [change] = (value as PromiseFulfilledResult<ChangeStreamDocument[]>).value;
expect(change).to.have.property('operationType', 'insert');

expect(aggregateEvents).to.have.lengthOf(6);
}
);
}

it(
Expand Down
6 changes: 1 addition & 5 deletions test/integration/change-streams/change_streams.spec.test.ts
Expand Up @@ -4,9 +4,5 @@ import { loadSpecTests } from '../../spec';
import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner';

describe('Change Streams Spec - Unified', function () {
runUnifiedSuite(loadSpecTests(path.join('change-streams', 'unified')), test =>
test.description === 'Test consecutive resume'
? 'TODO(NODE-4670): fix consecutive resume change stream test'
: false
);
runUnifiedSuite(loadSpecTests(path.join('change-streams', 'unified')));
});
Expand Up @@ -23,10 +23,6 @@ const filter: TestFilter = ({ description }) => {
return 'TODO(NODE-3308): failures due unnecessary getMore and killCursors calls in 5.0';
}

if (description === 'Test consecutive resume') {
return 'TODO(NODE-4670): fix consecutive resume change stream test';
}

if (
process.env.AUTH === 'auth' &&
[
Expand Down

0 comments on commit 89b27e9

Please sign in to comment.