Skip to content

Commit

Permalink
feat: add nested transaction (#8541)
Browse files Browse the repository at this point in the history
* feat: add nested transaction

This will allow nested transaction for postgres driver using "save point"

Closes: #1505

* package-lock.json

* removed `RELEASE SAVEPOINT` from OracleQueryRunner

* fixed transaction support in sqljs-based drivers

* improved nested transactions logic across all drivers;
code refactoring;

* added nested transactions support for mssql

* fixed failing test

Co-authored-by: mortzprk <mortz.prk@gmail.com>
Co-authored-by: AlexMesser
  • Loading branch information
mortezaPRK and mortzprk committed Feb 26, 2022
1 parent 33b2bd7 commit 6523526
Show file tree
Hide file tree
Showing 27 changed files with 577 additions and 179 deletions.
7 changes: 3 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions src/driver/Driver.ts
Expand Up @@ -48,6 +48,11 @@ export interface Driver {
*/
treeSupport: boolean;

/**
* Represent transaction support by this driver
*/
transactionSupport: "simple" | "nested" | "none";

/**
* Gets list of supported column data types by a driver.
*/
Expand Down
5 changes: 5 additions & 0 deletions src/driver/aurora-data-api-pg/AuroraDataApiPostgresDriver.ts
Expand Up @@ -33,6 +33,11 @@ export class AuroraDataApiPostgresDriver extends PostgresWrapper implements Driv

client: any;

/**
* Represent transaction support by this driver
*/
transactionSupport = "nested" as const;

// -------------------------------------------------------------------------
// Public Implemented Properties
// -------------------------------------------------------------------------
Expand Down
41 changes: 28 additions & 13 deletions src/driver/aurora-data-api-pg/AuroraDataApiPostgresQueryRunner.ts
@@ -1,12 +1,11 @@
import {QueryRunnerAlreadyReleasedError} from "../../error/QueryRunnerAlreadyReleasedError";
import {TransactionAlreadyStartedError} from "../../error/TransactionAlreadyStartedError";
import {TransactionNotStartedError} from "../../error/TransactionNotStartedError";
import {QueryRunner} from "../../query-runner/QueryRunner";
import {IsolationLevel} from "../types/IsolationLevel";
import {AuroraDataApiPostgresDriver} from "./AuroraDataApiPostgresDriver";
import {PostgresQueryRunner} from "../postgres/PostgresQueryRunner";
import {ReplicationMode} from "../types/ReplicationMode";
import { QueryResult } from "../../query-runner/QueryResult";
import {QueryResult} from "../../query-runner/QueryResult";

class PostgresQueryRunnerWrapper extends PostgresQueryRunner {
driver: any;
Expand Down Expand Up @@ -90,16 +89,22 @@ export class AuroraDataApiPostgresQueryRunner extends PostgresQueryRunnerWrapper
* Starts transaction on the current connection.
*/
async startTransaction(isolationLevel?: IsolationLevel): Promise<void> {
if (this.isTransactionActive)
throw new TransactionAlreadyStartedError();

await this.broadcaster.broadcast('BeforeTransactionStart')

this.isTransactionActive = true;
try {
await this.broadcaster.broadcast('BeforeTransactionStart');
} catch (err) {
this.isTransactionActive = false;
throw err;
}

await this.client.startTransaction();
if (this.transactionDepth === 0) {
await this.client.startTransaction();
} else {
await this.query(`SAVEPOINT typeorm_${this.transactionDepth}`);
}
this.transactionDepth += 1;

await this.broadcaster.broadcast('AfterTransactionStart')
await this.broadcaster.broadcast('AfterTransactionStart');
}

/**
Expand All @@ -112,9 +117,13 @@ export class AuroraDataApiPostgresQueryRunner extends PostgresQueryRunnerWrapper

await this.broadcaster.broadcast('BeforeTransactionCommit');

await this.client.commitTransaction();

this.isTransactionActive = false;
if (this.transactionDepth > 1) {
await this.query(`RELEASE SAVEPOINT typeorm_${this.transactionDepth - 1}`);
} else {
await this.client.commitTransaction();
this.isTransactionActive = false;
}
this.transactionDepth -= 1;

await this.broadcaster.broadcast('AfterTransactionCommit');
}
Expand All @@ -129,7 +138,13 @@ export class AuroraDataApiPostgresQueryRunner extends PostgresQueryRunnerWrapper

await this.broadcaster.broadcast('BeforeTransactionRollback');

await this.client.rollbackTransaction();
if (this.transactionDepth > 1) {
await this.query(`ROLLBACK TO SAVEPOINT typeorm_${this.transactionDepth - 1}`);
} else {
await this.client.rollbackTransaction();
this.isTransactionActive = false;
}
this.transactionDepth -= 1;

await this.broadcaster.broadcast('AfterTransactionRollback');
}
Expand Down
5 changes: 5 additions & 0 deletions src/driver/aurora-data-api/AuroraDataApiDriver.ts
Expand Up @@ -79,6 +79,11 @@ export class AuroraDataApiDriver implements Driver {
*/
treeSupport = true;

/**
* Represent transaction support by this driver
*/
transactionSupport = "nested" as const;

/**
* Gets list of supported column data types by a driver.
*
Expand Down
41 changes: 28 additions & 13 deletions src/driver/aurora-data-api/AuroraDataApiQueryRunner.ts
@@ -1,7 +1,6 @@
import {QueryResult} from "../../query-runner/QueryResult";
import {QueryRunner} from "../../query-runner/QueryRunner";
import {ObjectLiteral} from "../../common/ObjectLiteral";
import {TransactionAlreadyStartedError} from "../../error/TransactionAlreadyStartedError";
import {TransactionNotStartedError} from "../../error/TransactionNotStartedError";
import {TableColumn} from "../../schema-builder/table/TableColumn";
import {Table} from "../../schema-builder/table/Table";
Expand All @@ -21,7 +20,7 @@ import {ColumnType} from "../types/ColumnTypes";
import {TableCheck} from "../../schema-builder/table/TableCheck";
import {IsolationLevel} from "../types/IsolationLevel";
import {TableExclusion} from "../../schema-builder/table/TableExclusion";
import { TypeORMError } from "../../error";
import {TypeORMError} from "../../error";
import {MetadataTableType} from "../types/MetadataTableType";

/**
Expand Down Expand Up @@ -89,13 +88,20 @@ export class AuroraDataApiQueryRunner extends BaseQueryRunner implements QueryRu
* Starts transaction on the current connection.
*/
async startTransaction(isolationLevel?: IsolationLevel): Promise<void> {
if (this.isTransactionActive)
throw new TransactionAlreadyStartedError();

await this.broadcaster.broadcast('BeforeTransactionStart');

this.isTransactionActive = true;
await this.client.startTransaction();
try {
await this.broadcaster.broadcast('BeforeTransactionStart');
} catch (err) {
this.isTransactionActive = false;
throw err;
}

if (this.transactionDepth === 0) {
await this.client.startTransaction();
} else {
await this.query(`SAVEPOINT typeorm_${this.transactionDepth}`);
}
this.transactionDepth += 1;

await this.broadcaster.broadcast('AfterTransactionStart');
}
Expand All @@ -110,8 +116,13 @@ export class AuroraDataApiQueryRunner extends BaseQueryRunner implements QueryRu

await this.broadcaster.broadcast('BeforeTransactionCommit');

await this.client.commitTransaction();
this.isTransactionActive = false;
if (this.transactionDepth > 1) {
await this.query(`RELEASE SAVEPOINT typeorm_${this.transactionDepth - 1}`);
} else {
await this.client.commitTransaction();
this.isTransactionActive = false;
}
this.transactionDepth -= 1;

await this.broadcaster.broadcast('AfterTransactionCommit');
}
Expand All @@ -126,9 +137,13 @@ export class AuroraDataApiQueryRunner extends BaseQueryRunner implements QueryRu

await this.broadcaster.broadcast('BeforeTransactionRollback');

await this.client.rollbackTransaction();

this.isTransactionActive = false;
if (this.transactionDepth > 1) {
await this.query(`ROLLBACK TO SAVEPOINT typeorm_${this.transactionDepth - 1}`);
} else {
await this.client.rollbackTransaction();
this.isTransactionActive = false;
}
this.transactionDepth -= 1;

await this.broadcaster.broadcast('AfterTransactionRollback');
}
Expand Down
6 changes: 6 additions & 0 deletions src/driver/cockroachdb/CockroachDriver.ts
Expand Up @@ -99,6 +99,12 @@ export class CockroachDriver implements Driver {
*/
treeSupport = true;

/**
* Represent transaction support by this driver
*/
transactionSupport = "nested" as const;


/**
* Gets list of supported column data types by a driver.
*
Expand Down
75 changes: 46 additions & 29 deletions src/driver/cockroachdb/CockroachQueryRunner.ts
@@ -1,7 +1,6 @@
import {QueryResult} from "../../query-runner/QueryResult";
import {QueryRunner} from "../../query-runner/QueryRunner";
import {ObjectLiteral} from "../../common/ObjectLiteral";
import {TransactionAlreadyStartedError} from "../../error/TransactionAlreadyStartedError";
import {TransactionNotStartedError} from "../../error/TransactionNotStartedError";
import {TableColumn} from "../../schema-builder/table/TableColumn";
import {Table} from "../../schema-builder/table/Table";
Expand All @@ -23,7 +22,7 @@ import {ColumnType} from "../types/ColumnTypes";
import {IsolationLevel} from "../types/IsolationLevel";
import {TableExclusion} from "../../schema-builder/table/TableExclusion";
import {ReplicationMode} from "../types/ReplicationMode";
import { TypeORMError } from "../../error";
import {TypeORMError} from "../../error";
import {MetadataTableType} from "../types/MetadataTableType";

/**
Expand Down Expand Up @@ -130,17 +129,25 @@ export class CockroachQueryRunner extends BaseQueryRunner implements QueryRunner
* Starts transaction.
*/
async startTransaction(isolationLevel?: IsolationLevel): Promise<void> {
if (this.isTransactionActive)
throw new TransactionAlreadyStartedError();

await this.broadcaster.broadcast('BeforeTransactionStart');

this.isTransactionActive = true;
await this.query("START TRANSACTION");
await this.query("SAVEPOINT cockroach_restart");
if (isolationLevel) {
await this.query("SET TRANSACTION ISOLATION LEVEL " + isolationLevel);
try {
await this.broadcaster.broadcast('BeforeTransactionStart');
} catch (err) {
this.isTransactionActive = false;
throw err;
}

if (this.transactionDepth === 0) {
await this.query("START TRANSACTION");
await this.query("SAVEPOINT cockroach_restart");
if (isolationLevel) {
await this.query("SET TRANSACTION ISOLATION LEVEL " + isolationLevel);
}
} else {
await this.query(`SAVEPOINT typeorm_${this.transactionDepth}`)
}

this.transactionDepth += 1;
this.storeQueries = true;

await this.broadcaster.broadcast('AfterTransactionStart');
Expand All @@ -156,21 +163,26 @@ export class CockroachQueryRunner extends BaseQueryRunner implements QueryRunner

await this.broadcaster.broadcast('BeforeTransactionCommit');

this.storeQueries = false;

try {
await this.query("RELEASE SAVEPOINT cockroach_restart");
await this.query("COMMIT");
this.queries = [];
this.isTransactionActive = false;

} catch (e) {
if (e.code === "40001") {
await this.query("ROLLBACK TO SAVEPOINT cockroach_restart");
for (const q of this.queries) {
await this.query(q.query, q.parameters);
if (this.transactionDepth > 1) {
await this.query(`RELEASE SAVEPOINT typeorm_${this.transactionDepth - 1}`);
this.transactionDepth -= 1;
} else {
this.storeQueries = false;
try {
await this.query("RELEASE SAVEPOINT cockroach_restart");
await this.query("COMMIT");
this.queries = [];
this.isTransactionActive = false;
this.transactionDepth -= 1;

} catch (e) {
if (e.code === "40001") {
await this.query("ROLLBACK TO SAVEPOINT cockroach_restart");
for (const q of this.queries) {
await this.query(q.query, q.parameters);
}
await this.commitTransaction();
}
await this.commitTransaction();
}
}

Expand All @@ -187,10 +199,15 @@ export class CockroachQueryRunner extends BaseQueryRunner implements QueryRunner

await this.broadcaster.broadcast('BeforeTransactionRollback');

this.storeQueries = false;
await this.query("ROLLBACK");
this.queries = [];
this.isTransactionActive = false;
if (this.transactionDepth > 1) {
await this.query(`ROLLBACK TO SAVEPOINT typeorm_${this.transactionDepth - 1}`);
} else {
this.storeQueries = false;
await this.query("ROLLBACK");
this.queries = [];
this.isTransactionActive = false;
}
this.transactionDepth -= 1;

await this.broadcaster.broadcast('AfterTransactionRollback');
}
Expand Down

0 comments on commit 6523526

Please sign in to comment.