From fedff2ed482934f812c7a9fb31b12cda2b089bc9 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 30 Jun 2021 14:37:58 -0400 Subject: [PATCH 1/6] fix(NODE-1843): bulk operations ignoring provided sessions --- src/bulk/common.ts | 3 +- test/functional/bulk.test.js | 78 ++++++++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+), 1 deletion(-) diff --git a/src/bulk/common.ts b/src/bulk/common.ts index 5b08a86d0f..2658120a66 100644 --- a/src/bulk/common.ts +++ b/src/bulk/common.ts @@ -1200,7 +1200,8 @@ export abstract class BulkOperationBase { } this.s.executed = true; - return executeLegacyOperation(this.s.topology, executeCommands, [this, options, callback]); + const finalOptions = { ...this.s.options, ...options }; + return executeLegacyOperation(this.s.topology, executeCommands, [this, finalOptions, callback]); } /** diff --git a/test/functional/bulk.test.js b/test/functional/bulk.test.js index 9c69ee0ef0..ebd687a766 100644 --- a/test/functional/bulk.test.js +++ b/test/functional/bulk.test.js @@ -2003,4 +2003,82 @@ describe('Bulk', function () { ); }); }); + + describe('Bulk operation transaction rollback', () => { + /** @type {import('../../src/index').MongoClient} */ + let client; + /** @type {import('../../src/index').Collection<{ answer: number }>} */ + let collection; + + beforeEach(async function () { + const config = this.configuration; + client = config.newClient(); + await client.connect(); + + collection = client + .db('bulk_operation_writes_test') + .collection('bulk_write_transaction_test'); + + await collection.deleteMany({}); + }); + + it('should abort ordered/unordered bulk operation writes', async () => { + const session = client.startSession(); + session.startTransaction({ + readConcern: { level: 'snapshot' }, + writeConcern: { w: 'majority' } + }); + + let bulk = undefined; + + // 1. + bulk = collection.initializeUnorderedBulkOp({ session }); + bulk.insert({ answer: 42 }); + await bulk.execute(); + + // 2. + bulk = collection.initializeOrderedBulkOp({ session }); + bulk.insert({ answer: 43 }); + await bulk.execute(); + + await session.abortTransaction(); + await session.endSession(); + + const documents = await collection.find().toArray(); + + expect(documents).to.be.lengthOf(0, 'bulk operation writes were made outside of transaction'); + + await client.close(); // session leak checker doesn't let you put the close in an after hook + }); + + it('should abort ordered/unordered bulk operation writes using withTransaction', async () => { + const session = client.startSession(); + + await session.withTransaction( + async () => { + let bulk = undefined; + // 1. + bulk = collection.initializeUnorderedBulkOp({ session }); + bulk.insert({ answer: 42 }); + await bulk.execute(); + + // 2. + bulk = collection.initializeOrderedBulkOp({ session }); + bulk.insert({ answer: 43 }); + await bulk.execute(); + + await session.abortTransaction(); + }, + { readConcern: { level: 'snapshot' }, writeConcern: { w: 'majority' } } + ); + + await session.endSession(); + + const documents = await collection.find().toArray(); + + expect(documents).to.be.lengthOf(0, 'bulk operation writes were made outside of transaction'); + + await client.close(); // session leak checker doesn't let you put the close in an after hook + }); + }); }); From 7b2ad61b456bd0eaf78b30282a24504838868199 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 1 Jul 2021 14:19:55 -0400 Subject: [PATCH 2/6] test: add version and topology filters --- test/functional/bulk.test.js | 100 ++++++++++++++++++++--------------- 1 file changed, 56 insertions(+), 44 deletions(-) diff --git a/test/functional/bulk.test.js b/test/functional/bulk.test.js index ebd687a766..9c3c89aa0c 100644 --- a/test/functional/bulk.test.js +++ b/test/functional/bulk.test.js @@ -2022,63 +2022,75 @@ describe('Bulk', function () { await collection.deleteMany({}); }); - it('should abort ordered/unordered bulk operation writes', async () => { - const session = client.startSession(); - session.startTransaction({ - readConcern: { level: 'snapshot' }, - writeConcern: { w: 'majority' } - }); + it('should abort ordered/unordered bulk operation writes', { + metadata: { requires: { mongodb: '>= 3.6', topology: ['replicaset', 'sharded'] } }, + async test() { + const session = client.startSession(); + session.startTransaction({ + readConcern: { level: 'snapshot' }, + writeConcern: { w: 'majority' } + }); - let bulk = undefined; + let bulk = undefined; - // 1. - bulk = collection.initializeUnorderedBulkOp({ session }); - bulk.insert({ answer: 42 }); - await bulk.execute(); + // 1. + bulk = collection.initializeUnorderedBulkOp({ session }); + bulk.insert({ answer: 42 }); + await bulk.execute(); - // 2. - bulk = collection.initializeOrderedBulkOp({ session }); - bulk.insert({ answer: 43 }); - await bulk.execute(); + // 2. + bulk = collection.initializeOrderedBulkOp({ session }); + bulk.insert({ answer: 43 }); + await bulk.execute(); - await session.abortTransaction(); - await session.endSession(); + await session.abortTransaction(); + await session.endSession(); - const documents = await collection.find().toArray(); + const documents = await collection.find().toArray(); - expect(documents).to.be.lengthOf(0, 'bulk operation writes were made outside of transaction'); + expect(documents).to.be.lengthOf( + 0, + 'bulk operation writes were made outside of transaction' + ); - await client.close(); // session leak checker doesn't let you put the close in an after hook + await client.close(); // session leak checker doesn't let you put the close in an after hook + } }); - it('should abort ordered/unordered bulk operation writes using withTransaction', async () => { - const session = client.startSession(); - - await session.withTransaction( - async () => { - let bulk = undefined; - // 1. - bulk = collection.initializeUnorderedBulkOp({ session }); - bulk.insert({ answer: 42 }); - await bulk.execute(); - - // 2. - bulk = collection.initializeOrderedBulkOp({ session }); - bulk.insert({ answer: 43 }); - await bulk.execute(); - - await session.abortTransaction(); - }, - { readConcern: { level: 'snapshot' }, writeConcern: { w: 'majority' } } - ); + it('should abort ordered/unordered bulk operation writes using withTransaction', { + metadata: { requires: { mongodb: '>= 3.6', topology: ['replicaset', 'sharded'] } }, + async test() { + const session = client.startSession(); + + await session.withTransaction( + async () => { + let bulk = undefined; + // 1. + bulk = collection.initializeUnorderedBulkOp({ session }); + bulk.insert({ answer: 42 }); + await bulk.execute(); + + // 2. + bulk = collection.initializeOrderedBulkOp({ session }); + bulk.insert({ answer: 43 }); + await bulk.execute(); + + await session.abortTransaction(); + }, + { readConcern: { level: 'snapshot' }, writeConcern: { w: 'majority' } } + ); - await session.endSession(); + await session.endSession(); - const documents = await collection.find().toArray(); + const documents = await collection.find().toArray(); - expect(documents).to.be.lengthOf(0, 'bulk operation writes were made outside of transaction'); + expect(documents).to.be.lengthOf( + 0, + 'bulk operation writes were made outside of transaction' + ); - await client.close(); // session leak checker doesn't let you put the close in an after hook + await client.close(); // session leak checker doesn't let you put the close in an after hook + } }); }); }); From 9106f73b1f7cc50c59b4178ca7cda386721b3c83 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 1 Jul 2021 16:42:59 -0400 Subject: [PATCH 3/6] test: ensure collection is created before testing since it is required for multi doc transaction --- test/functional/bulk.test.js | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/test/functional/bulk.test.js b/test/functional/bulk.test.js index 9c3c89aa0c..4a3e45461a 100644 --- a/test/functional/bulk.test.js +++ b/test/functional/bulk.test.js @@ -2015,9 +2015,18 @@ describe('Bulk', function () { client = config.newClient(); await client.connect(); - collection = client + try { + await client + .db('bulk_operation_writes_test') + .collection('bulk_write_transaction_test') + .drop(); + } catch (_) { + // do not care + } + + collection = await client .db('bulk_operation_writes_test') - .collection('bulk_write_transaction_test'); + .createCollection('bulk_write_transaction_test'); await collection.deleteMany({}); }); @@ -2027,7 +2036,7 @@ describe('Bulk', function () { async test() { const session = client.startSession(); session.startTransaction({ - readConcern: { level: 'snapshot' }, + readConcern: { level: 'majority' }, writeConcern: { w: 'majority' } }); @@ -2077,7 +2086,7 @@ describe('Bulk', function () { await session.abortTransaction(); }, - { readConcern: { level: 'snapshot' }, writeConcern: { w: 'majority' } } + { readConcern: { level: 'majority' }, writeConcern: { w: 'majority' } } ); await session.endSession(); From 4697949ec2b7e5bf45fa81056959171063ac2f65 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 2 Jul 2021 14:08:58 -0400 Subject: [PATCH 4/6] test: filter topology, and set correct readConcernLevel --- test/functional/bulk.test.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/functional/bulk.test.js b/test/functional/bulk.test.js index 4a3e45461a..fd6530a301 100644 --- a/test/functional/bulk.test.js +++ b/test/functional/bulk.test.js @@ -2032,11 +2032,11 @@ describe('Bulk', function () { }); it('should abort ordered/unordered bulk operation writes', { - metadata: { requires: { mongodb: '>= 3.6', topology: ['replicaset', 'sharded'] } }, + metadata: { requires: { mongodb: '>= 3.6', topology: ['replicaset'] } }, async test() { const session = client.startSession(); session.startTransaction({ - readConcern: { level: 'majority' }, + readConcern: { level: 'local' }, writeConcern: { w: 'majority' } }); @@ -2067,7 +2067,7 @@ describe('Bulk', function () { }); it('should abort ordered/unordered bulk operation writes using withTransaction', { - metadata: { requires: { mongodb: '>= 3.6', topology: ['replicaset', 'sharded'] } }, + metadata: { requires: { mongodb: '>= 3.6', topology: ['replicaset'] } }, async test() { const session = client.startSession(); @@ -2086,7 +2086,7 @@ describe('Bulk', function () { await session.abortTransaction(); }, - { readConcern: { level: 'majority' }, writeConcern: { w: 'majority' } } + { readConcern: { level: 'local' }, writeConcern: { w: 'majority' } } ); await session.endSession(); From df3064baab41db0ff7c1b30b80b2e4acbcc27d8b Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 2 Jul 2021 14:30:32 -0400 Subject: [PATCH 5/6] test: filter to later mongodb version --- test/functional/bulk.test.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/functional/bulk.test.js b/test/functional/bulk.test.js index fd6530a301..a751947af0 100644 --- a/test/functional/bulk.test.js +++ b/test/functional/bulk.test.js @@ -2032,7 +2032,7 @@ describe('Bulk', function () { }); it('should abort ordered/unordered bulk operation writes', { - metadata: { requires: { mongodb: '>= 3.6', topology: ['replicaset'] } }, + metadata: { requires: { mongodb: '>= 4.2', topology: ['replicaset'] } }, async test() { const session = client.startSession(); session.startTransaction({ @@ -2067,7 +2067,7 @@ describe('Bulk', function () { }); it('should abort ordered/unordered bulk operation writes using withTransaction', { - metadata: { requires: { mongodb: '>= 3.6', topology: ['replicaset'] } }, + metadata: { requires: { mongodb: '>= 4.2', topology: ['replicaset'] } }, async test() { const session = client.startSession(); From 1f4b30e1f24f6f56b97a004c5c5c89f854db7e72 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 7 Jul 2021 13:28:14 -0400 Subject: [PATCH 6/6] test: cleanup --- test/functional/bulk.test.js | 77 ++++++++++++++++++++++++++++-------- 1 file changed, 60 insertions(+), 17 deletions(-) diff --git a/test/functional/bulk.test.js b/test/functional/bulk.test.js index a751947af0..61d1cca369 100644 --- a/test/functional/bulk.test.js +++ b/test/functional/bulk.test.js @@ -2031,7 +2031,11 @@ describe('Bulk', function () { await collection.deleteMany({}); }); - it('should abort ordered/unordered bulk operation writes', { + afterEach(async () => { + if (client) await client.close(); + }); + + it('should abort ordered bulk operation writes', { metadata: { requires: { mongodb: '>= 4.2', topology: ['replicaset'] } }, async test() { const session = client.startSession(); @@ -2042,14 +2046,35 @@ describe('Bulk', function () { let bulk = undefined; - // 1. - bulk = collection.initializeUnorderedBulkOp({ session }); + bulk = collection.initializeOrderedBulkOp({ session }); bulk.insert({ answer: 42 }); await bulk.execute(); - // 2. - bulk = collection.initializeOrderedBulkOp({ session }); - bulk.insert({ answer: 43 }); + await session.abortTransaction(); + await session.endSession(); + + const documents = await collection.find().toArray(); + + expect(documents).to.have.lengthOf( + 0, + 'bulk operation writes were made outside of transaction' + ); + } + }); + + it('should abort unordered bulk operation writes', { + metadata: { requires: { mongodb: '>= 4.2', topology: ['replicaset'] } }, + async test() { + const session = client.startSession(); + session.startTransaction({ + readConcern: { level: 'local' }, + writeConcern: { w: 'majority' } + }); + + let bulk = undefined; + + bulk = collection.initializeUnorderedBulkOp({ session }); + bulk.insert({ answer: 42 }); await bulk.execute(); await session.abortTransaction(); @@ -2057,16 +2082,14 @@ describe('Bulk', function () { const documents = await collection.find().toArray(); - expect(documents).to.be.lengthOf( + expect(documents).to.have.lengthOf( 0, 'bulk operation writes were made outside of transaction' ); - - await client.close(); // session leak checker doesn't let you put the close in an after hook } }); - it('should abort ordered/unordered bulk operation writes using withTransaction', { + it('should abort unordered bulk operation writes using withTransaction', { metadata: { requires: { mongodb: '>= 4.2', topology: ['replicaset'] } }, async test() { const session = client.startSession(); @@ -2074,16 +2097,38 @@ describe('Bulk', function () { await session.withTransaction( async () => { let bulk = undefined; - // 1. + bulk = collection.initializeUnorderedBulkOp({ session }); bulk.insert({ answer: 42 }); await bulk.execute(); + await session.abortTransaction(); + }, + { readConcern: { level: 'local' }, writeConcern: { w: 'majority' } } + ); + + await session.endSession(); + + const documents = await collection.find().toArray(); + + expect(documents).to.have.lengthOf( + 0, + 'bulk operation writes were made outside of transaction' + ); + } + }); + + it('should abort ordered bulk operation writes using withTransaction', { + metadata: { requires: { mongodb: '>= 4.2', topology: ['replicaset'] } }, + async test() { + const session = client.startSession(); + + await session.withTransaction( + async () => { + let bulk = undefined; - // 2. bulk = collection.initializeOrderedBulkOp({ session }); - bulk.insert({ answer: 43 }); + bulk.insert({ answer: 42 }); await bulk.execute(); - await session.abortTransaction(); }, { readConcern: { level: 'local' }, writeConcern: { w: 'majority' } } @@ -2093,12 +2138,10 @@ describe('Bulk', function () { const documents = await collection.find().toArray(); - expect(documents).to.be.lengthOf( + expect(documents).to.have.lengthOf( 0, 'bulk operation writes were made outside of transaction' ); - - await client.close(); // session leak checker doesn't let you put the close in an after hook } }); });