Skip to content

Commit

Permalink
fix(NODE-5052): prevent cursor and changestream close logic from runn…
Browse files Browse the repository at this point in the history
…ing more than once (#3562)
  • Loading branch information
nbbeeken committed Feb 14, 2023
1 parent 4bac63c commit 71d0d79
Show file tree
Hide file tree
Showing 9 changed files with 228 additions and 196 deletions.
8 changes: 6 additions & 2 deletions .mocharc.json
Expand Up @@ -3,9 +3,13 @@
"require": [
"source-map-support/register",
"ts-node/register",
"test/tools/runner/chai-addons.js"
"test/tools/runner/chai-addons.js",
"test/tools/runner/hooks/unhandled_checker.ts"
],
"extension": [
"js",
"ts"
],
"extension": ["js", "ts"],
"recursive": true,
"timeout": 60000,
"failZero": true,
Expand Down
11 changes: 7 additions & 4 deletions src/cursor/abstract_cursor.ts
Expand Up @@ -828,13 +828,16 @@ function cleanupCursor(

cursor[kKilled] = true;

if (session.hasEnded) {
return completeCleanup();
}

executeOperation(
cursor[kClient],
new KillCursorsOperation(cursorId, cursorNs, server, { session })
).finally(() => {
completeCleanup();
});
return;
)
.catch(() => null)
.finally(completeCleanup);
}

/** @internal */
Expand Down
24 changes: 24 additions & 0 deletions test/integration/change-streams/change_stream.test.ts
Expand Up @@ -1019,6 +1019,30 @@ describe('Change Streams', function () {
}
}
);

it(
'when closed throws "ChangeStream is closed"',
{ requires: { topology: '!single' } },
async function () {
changeStream = collection.watch();

const loop = (async function () {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const _change of changeStream) {
return 'loop entered'; // loop should never be entered
}
return 'loop ended without error'; // loop should not finish without error
})();

await sleep(1);
const closeResult = changeStream.close().catch(error => error);
expect(closeResult).to.not.be.instanceOf(Error);

const result = await loop.catch(error => error);
expect(result).to.be.instanceOf(MongoAPIError);
expect(result.message).to.match(/ChangeStream is closed/i);
}
);
});

describe('#return', function () {
Expand Down
177 changes: 70 additions & 107 deletions test/integration/crud/crud_api.test.ts
@@ -1,4 +1,5 @@
import { expect } from 'chai';
import { on } from 'events';

import { MongoClient, MongoError, ObjectId, ReturnDocument } from '../../mongodb';
import { assert as test } from '../shared';
Expand Down Expand Up @@ -60,130 +61,92 @@ describe('CRUD API', function () {
await client.close();
});

it('should correctly execute find method using crud api', function (done) {
const db = client.db();

db.collection('t').insertMany([{ a: 1 }, { a: 1 }, { a: 1 }, { a: 1 }], function (err) {
expect(err).to.not.exist;

//
// Cursor
// --------------------------------------------------
const makeCursor = () => {
// Possible methods on the the cursor instance
return db
.collection('t')
.find({})
.filter({ a: 1 })
.addCursorFlag('noCursorTimeout', true)
.addQueryModifier('$comment', 'some comment')
.batchSize(2)
.comment('some comment 2')
.limit(2)
.maxTimeMS(50)
.project({ a: 1 })
.skip(0)
.sort({ a: 1 });
};
context('when creating a cursor with find', () => {
let collection;

//
// Exercise count method
// -------------------------------------------------
const countMethod = function () {
// Execute the different methods supported by the cursor
const cursor = makeCursor();
cursor.count(function (err, count) {
expect(err).to.not.exist;
test.equal(2, count);
eachMethod();
});
};
beforeEach(async () => {
collection = client.db().collection('t');
await collection.drop().catch(() => null);
await collection.insertMany([{ a: 1 }, { a: 1 }, { a: 1 }, { a: 1 }]);
});

//
// Exercise legacy method each
// -------------------------------------------------
const eachMethod = function () {
let count = 0;
afterEach(async () => {
await collection?.drop().catch(() => null);
});

const makeCursor = () => {
// Possible methods on the the cursor instance
return collection
.find({})
.filter({ a: 1 })
.addCursorFlag('noCursorTimeout', true)
.addQueryModifier('$comment', 'some comment')
.batchSize(1)
.comment('some comment 2')
.limit(2)
.maxTimeMS(50)
.project({ a: 1 })
.skip(0)
.sort({ a: 1 });
};

describe('#count()', () => {
it('returns the number of documents', async () => {
const cursor = makeCursor();
cursor.forEach(
() => {
count = count + 1;
},
err => {
expect(err).to.not.exist;
test.equal(2, count);
toArrayMethod();
}
);
};
const res = await cursor.count();
expect(res).to.equal(2);
});
});

//
// Exercise toArray
// -------------------------------------------------
const toArrayMethod = function () {
describe('#forEach()', () => {
it('iterates all the documents', async () => {
const cursor = makeCursor();
cursor.toArray(function (err, docs) {
expect(err).to.not.exist;
test.equal(2, docs.length);
nextMethod();
let count = 0;
await cursor.forEach(() => {
count += 1;
});
};
expect(count).to.equal(2);
});
});

//
// Exercise next method
// -------------------------------------------------
const nextMethod = function () {
describe('#toArray()', () => {
it('returns an array with all documents', async () => {
const cursor = makeCursor();
cursor.next(function (err, doc) {
expect(err).to.not.exist;
test.ok(doc != null);

cursor.next(function (err, doc) {
expect(err).to.not.exist;
test.ok(doc != null);
const res = await cursor.toArray();
expect(res).to.have.lengthOf(2);
});
});

cursor.next(function (err, doc) {
expect(err).to.not.exist;
expect(doc).to.not.exist;
streamMethod();
});
});
});
};
describe('#next()', () => {
it('is callable without blocking', async () => {
const cursor = makeCursor();
const doc0 = await cursor.next();
expect(doc0).to.exist;
const doc1 = await cursor.next();
expect(doc1).to.exist;
const doc2 = await cursor.next();
expect(doc2).to.not.exist;
});
});

//
// Exercise stream
// -------------------------------------------------
const streamMethod = function () {
let count = 0;
describe('#stream()', () => {
it('creates a node stream that emits data events', async () => {
const count = 0;
const cursor = makeCursor();
const stream = cursor.stream();
stream.on('data', function () {
count = count + 1;
});

on(stream, 'data');
cursor.once('close', function () {
test.equal(2, count);
explainMethod();
expect(count).to.equal(2);
});
};
});
});

//
// Explain method
// -------------------------------------------------
const explainMethod = function () {
describe('#explain()', () => {
it('returns an explain document', async () => {
const cursor = makeCursor();
cursor.explain(function (err, result) {
expect(err).to.not.exist;
test.ok(result != null);

client.close(done);
});
};

// Execute all the methods
countMethod();
const result = await cursor.explain();
expect(result).to.exist;
});
});
});

Expand Down
78 changes: 43 additions & 35 deletions test/integration/crud/find_cursor_methods.test.js
Expand Up @@ -108,50 +108,58 @@ describe('Find Cursor', function () {
});
});

context('#close', function () {
it('should send a killCursors command when closed before completely iterated', function (done) {
const commands = [];
client.on('commandStarted', filterForCommands(['killCursors'], commands));
describe('#close', function () {
let collection;

const coll = client.db().collection('abstract_cursor');
const cursor = coll.find({}, { batchSize: 2 });
cursor.next(err => {
expect(err).to.not.exist;
cursor.close(err => {
expect(err).to.not.exist;
expect(commands).to.have.length(1);
done();
});
});
beforeEach(async function () {
collection = client.db().collection('abstract_cursor');
await collection.drop().catch(() => null);
await collection.insertMany([{ a: 1 }, { a: 2 }, { a: 3 }, { a: 4 }]);
});

it('should not send a killCursors command when closed after completely iterated', function (done) {
const commands = [];
client.on('commandStarted', filterForCommands(['killCursors'], commands));
afterEach(async function () {
await collection?.drop().catch(() => null);
});

const coll = client.db().collection('abstract_cursor');
const cursor = coll.find({}, { batchSize: 2 });
cursor.toArray(err => {
expect(err).to.not.exist;
context('when closed before completely iterated', () => {
it('sends a killCursors command', async () => {
const killCursorsCommands = [];
client.on('commandStarted', filterForCommands(['killCursors'], killCursorsCommands));

cursor.close(err => {
expect(err).to.not.exist;
expect(commands).to.have.length(0);
done();
});
const cursor = collection.find({}, { batchSize: 2 });

const doc = await cursor.next();
expect(doc).property('a', 1);

expect(killCursorsCommands).to.have.length(0);
await cursor.close();
expect(killCursorsCommands).to.have.length(1);
});
});

it('should not send a killCursors command when closed before initialization', function (done) {
const commands = [];
client.on('commandStarted', filterForCommands(['killCursors'], commands));
context('when closed after completely iterated', () => {
it('does not send a killCursors command', async () => {
const killCursorsCommands = [];
client.on('commandStarted', filterForCommands(['killCursors'], killCursorsCommands));

const coll = client.db().collection('abstract_cursor');
const cursor = coll.find({}, { batchSize: 2 });
cursor.close(err => {
expect(err).to.not.exist;
expect(commands).to.have.length(0);
done();
const cursor = collection.find();
await cursor.toArray();
expect(killCursorsCommands).to.have.length(0);
await cursor.close();
expect(killCursorsCommands).to.have.length(0);
});
});

context('when closed before initialization', () => {
it('does not send a killCursors command', async () => {
const killCursorsCommands = [];
client.on('commandStarted', filterForCommands(['killCursors'], killCursorsCommands));

const cursor = collection.find();

expect(killCursorsCommands).to.have.length(0);
await cursor.close();
expect(killCursorsCommands).to.have.length(0);
});
});
});
Expand Down

0 comments on commit 71d0d79

Please sign in to comment.