Skip to content

Commit

Permalink
feat: add transactionAsyncLocalStorage option to opt in to automati…
Browse files Browse the repository at this point in the history
…cally setting `session` on all transactions

Fix #13889
  • Loading branch information
vkarpov15 committed May 10, 2024
1 parent 11c754c commit 819fde3
Show file tree
Hide file tree
Showing 10 changed files with 121 additions and 11 deletions.
31 changes: 28 additions & 3 deletions docs/transactions.md
@@ -1,8 +1,6 @@
# Transactions in Mongoose

[Transactions](https://www.mongodb.com/transactions) are new in MongoDB
4.0 and Mongoose 5.2.0. Transactions let you execute multiple operations
in isolation and potentially undo all the operations if one of them fails.
[Transactions](https://www.mongodb.com/transactions) let you execute multiple operations in isolation and potentially undo all the operations if one of them fails.
This guide will get you started using transactions with Mongoose.

<h2 id="getting-started-with-transactions"><a href="#getting-started-with-transactions">Getting Started with Transactions</a></h2>
Expand Down Expand Up @@ -86,6 +84,33 @@ Below is an example of executing an aggregation within a transaction.
[require:transactions.*aggregate]
```

<h2 id="asynclocalstorage"><a href="#asynclocalstorage">Using AsyncLocalStorage</a></h2>

One major pain point with transactions in Mongoose is that you need to remember to set the `session` option on every operation.
If you don't, your operation will execute outside of the transaction.
Mongoose 8.4 is able to set the `session` operation on all operations within a `Connection.prototype.transaction()` executor function using Node's [AsyncLocalStorage API](https://nodejs.org/api/async_context.html#class-asynclocalstorage).
Set the `transactionAsyncLocalStorage` option using `mongoose.set('transactionAsyncLocalStorage', true)` to enable this feature.

```javascript
mongoose.set('transactionAsyncLocalStorage', true);

const Test = mongoose.model('Test', mongoose.Schema({ name: String }));

const doc = new Test({ name: 'test' });

// Save a new doc in a transaction that aborts
await connection.transaction(async() => {
await doc.save(); // Notice no session here
throw new Error('Oops');
});

// false, `save()` was rolled back
await Test.exists({ _id: doc._id });
```

With `transactionAsyncLocalStorage`, you no longer need to pass sessions to every operation.
Mongoose will add the session by default under the hood.

<h2 id="advanced-usage"><a href="#advanced-usage">Advanced Usage</a></h2>

Advanced users who want more fine-grained control over when they commit or abort transactions
Expand Down
5 changes: 5 additions & 0 deletions lib/aggregate.js
Expand Up @@ -1022,6 +1022,11 @@ Aggregate.prototype.exec = async function exec() {
applyGlobalMaxTimeMS(this.options, model.db.options, model.base.options);
applyGlobalDiskUse(this.options, model.db.options, model.base.options);

const asyncLocalStorage = this.model?.db?.base.transactionAsyncLocalStorage?.getStore();
if (!this.options.hasOwnProperty('session') && asyncLocalStorage?.session != null) {
this.options.session = asyncLocalStorage.session;
}

if (this.options && this.options.cursor) {
return new AggregationCursor(this);
}
Expand Down
13 changes: 10 additions & 3 deletions lib/connection.js
Expand Up @@ -539,7 +539,7 @@ Connection.prototype.startSession = async function startSession(options) {
Connection.prototype.transaction = function transaction(fn, options) {
return this.startSession().then(session => {
session[sessionNewDocuments] = new Map();
return session.withTransaction(() => _wrapUserTransaction(fn, session), options).
return session.withTransaction(() => _wrapUserTransaction(fn, session, this.base), options).
then(res => {
delete session[sessionNewDocuments];
return res;
Expand All @@ -558,9 +558,16 @@ Connection.prototype.transaction = function transaction(fn, options) {
* Reset document state in between transaction retries re: gh-13698
*/

async function _wrapUserTransaction(fn, session) {
async function _wrapUserTransaction(fn, session, mongoose) {
try {
const res = await fn(session);
const res = mongoose.transactionAsyncLocalStorage == null
? await fn(session)
: await new Promise(resolve => {
mongoose.transactionAsyncLocalStorage.run(
{ session },
() => resolve(fn(session))
);
});
return res;
} catch (err) {
_resetSessionDocuments(session);
Expand Down
7 changes: 7 additions & 0 deletions lib/model.js
Expand Up @@ -296,8 +296,11 @@ Model.prototype.$__handleSave = function(options, callback) {
}

const session = this.$session();
const asyncLocalStorage = this.db.base.transactionAsyncLocalStorage?.getStore();
if (!saveOptions.hasOwnProperty('session') && session != null) {
saveOptions.session = session;
} else if (asyncLocalStorage?.session != null) {
saveOptions.session = asyncLocalStorage.session;
}
if (this.$isNew) {
// send entire doc
Expand Down Expand Up @@ -3533,6 +3536,10 @@ Model.bulkWrite = async function bulkWrite(ops, options) {
}

const validations = ops.map(op => castBulkWrite(this, op, options));
const asyncLocalStorage = this.db.base.transactionAsyncLocalStorage?.getStore();
if (!options.hasOwnProperty('session') && asyncLocalStorage.session != null) {
options = { ...options, session: asyncLocalStorage.session };
}

let res = null;
if (ordered) {
Expand Down
16 changes: 14 additions & 2 deletions lib/mongoose.js
Expand Up @@ -38,6 +38,8 @@ require('./helpers/printJestWarning');

const objectIdHexRegexp = /^[0-9A-Fa-f]{24}$/;

const { AsyncLocalStorage } = require('node:async_hooks');

/**
* Mongoose constructor.
*
Expand Down Expand Up @@ -101,6 +103,10 @@ function Mongoose(options) {
}
this.Schema.prototype.base = this;

if (options?.transactionAsyncLocalStorage) {
this.transactionAsyncLocalStorage = new AsyncLocalStorage();
}

Object.defineProperty(this, 'plugins', {
configurable: false,
enumerable: true,
Expand Down Expand Up @@ -267,15 +273,21 @@ Mongoose.prototype.set = function(key, value) {

if (optionKey === 'objectIdGetter') {
if (optionValue) {
Object.defineProperty(mongoose.Types.ObjectId.prototype, '_id', {
Object.defineProperty(_mongoose.Types.ObjectId.prototype, '_id', {
enumerable: false,
configurable: true,
get: function() {
return this;
}
});
} else {
delete mongoose.Types.ObjectId.prototype._id;
delete _mongoose.Types.ObjectId.prototype._id;
}
} else if (optionKey === 'transactionAsyncLocalStorage') {
if (optionValue && !_mongoose.transactionAsyncLocalStorage) {
_mongoose.transactionAsyncLocalStorage = new AsyncLocalStorage();
} else if (!optionValue && _mongoose.transactionAsyncLocalStorage) {
delete _mongoose.transactionAsyncLocalStorage;
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions lib/query.js
Expand Up @@ -1947,6 +1947,11 @@ Query.prototype._optionsForExec = function(model) {
// Apply schema-level `writeConcern` option
applyWriteConcern(model.schema, options);

const asyncLocalStorage = this.model.db.base.transactionAsyncLocalStorage?.getStore();
if (!this.options.hasOwnProperty('session') && asyncLocalStorage?.session != null) {
options.session = asyncLocalStorage.session;
}

const readPreference = model &&
model.schema &&
model.schema.options &&
Expand Down
1 change: 1 addition & 0 deletions lib/validOptions.js
Expand Up @@ -32,6 +32,7 @@ const VALID_OPTIONS = Object.freeze([
'strictQuery',
'toJSON',
'toObject',
'transactionAsyncLocalStorage',
'translateAliases'
]);

Expand Down
44 changes: 44 additions & 0 deletions test/docs/transactions.test.js
Expand Up @@ -351,6 +351,50 @@ describe('transactions', function() {
await session.endSession();
});

describe('transactionAsyncLocalStorage option', function() {
let m;
before(async function() {
m = new mongoose.Mongoose();
m.set('transactionAsyncLocalStorage', true);

await m.connect(start.uri);
});

after(async function() {
await m.disconnect();
});

it('transaction() sets `session` by default if transactionAsyncLocalStorage option is set', async function() {
const Test = m.model('Test', m.Schema({ name: String }));

await Test.createCollection();
await Test.deleteMany({});

const doc = new Test({ name: 'test' });
await assert.rejects(
() => m.connection.transaction(async() => {
await doc.save();

await Test.updateOne({ name: 'foo' }, { name: 'foo' }, { upsert: true });

let docs = await Test.aggregate([{ $match: { _id: doc._id } }]);
assert.equal(docs.length, 1);

const docs = await Test.find({ _id: doc._id });
assert.equal(docs.length, 1);

throw new Error('Oops!');
}),
/Oops!/
);
let exists = await Test.exists({ _id: doc._id });
assert.ok(!exists);

exists = await Test.exists({ name: 'foo' });
assert.ok(!exists);
});
});

it('transaction() resets $isNew on error', async function() {
db.deleteModel(/Test/);
const Test = db.model('Test', Schema({ name: String }));
Expand Down
3 changes: 0 additions & 3 deletions test/model.findByIdAndUpdate.test.js
Expand Up @@ -53,9 +53,6 @@ describe('model: findByIdAndUpdate:', function() {
'shape.side': 4,
'shape.color': 'white'
}, { new: true });
console.log('doc');
console.log(doc);
console.log('doc');

assert.equal(doc.shape.kind, 'gh8378_Square');
assert.equal(doc.shape.name, 'after');
Expand Down
7 changes: 7 additions & 0 deletions types/mongooseoptions.d.ts
Expand Up @@ -203,6 +203,13 @@ declare module 'mongoose' {
*/
toObject?: ToObjectOptions;

/**
* Set to true to make Mongoose use Node.js' built-in AsyncLocalStorage (Node >= 16.0.0)
* to set `session` option on all operations within a `connection.transaction(fn)` call
* by default. Defaults to false.
*/
transactionAsyncLocalStorage?: boolean;

/**
* If `true`, convert any aliases in filter, projection, update, and distinct
* to their database property names. Defaults to false.
Expand Down

0 comments on commit 819fde3

Please sign in to comment.