Skip to content

Commit

Permalink
feat: transactional events in subscriber interface + "transaction" op…
Browse files Browse the repository at this point in the history
…tion in FindOptions (#6996)

* added new decorators

* added hooks to etnity subscriber interface

* added test code in one query runner

* removed new decorators and implemented in subscriber instead

* added "transaction" option to FindOneOptions

* added event listeners in query runners;
added test for event listeners;
updated docs;

* working on test;

* working on test;

* added test for `transaction` option;

* fixing failing test

* fixing failing test

* fixing typos

* fixing test I broke

Co-authored-by: Dmitry Zotov <dmzt08@gmail.com>
  • Loading branch information
pleerock and AlexMesser committed Nov 2, 2020
1 parent f2ba901 commit 0e4b239
Show file tree
Hide file tree
Showing 23 changed files with 875 additions and 43 deletions.
112 changes: 98 additions & 14 deletions docs/listeners-and-subscribers.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ You must mark those methods with special decorators depending on what event you
### `@AfterLoad`

You can define a method with any name in entity and mark it with `@AfterLoad`
and TypeORM will call it each time the entity
and TypeORM will call it each time the entity
is loaded using `QueryBuilder` or repository/manager find methods.
Example:

```typescript
@Entity()
export class Post {

@AfterLoad()
updateCounters() {
if (this.likesCount === undefined)
Expand All @@ -43,7 +43,7 @@ Example:
```typescript
@Entity()
export class Post {

@BeforeInsert()
updateDates() {
this.createdDate = new Date();
Expand All @@ -60,7 +60,7 @@ Example:
```typescript
@Entity()
export class Post {

@AfterInsert()
resetCounters() {
this.counters = 0;
Expand All @@ -77,7 +77,7 @@ Example:
```typescript
@Entity()
export class Post {

@BeforeUpdate()
updateDates() {
this.updatedDate = new Date();
Expand All @@ -94,7 +94,7 @@ Example:
```typescript
@Entity()
export class Post {

@AfterUpdate()
updateCounters() {
this.counter = 0;
Expand All @@ -111,7 +111,7 @@ Example:
```typescript
@Entity()
export class Post {

@BeforeRemove()
updateStatus() {
this.status = "removed";
Expand All @@ -128,7 +128,7 @@ Example:
```typescript
@Entity()
export class Post {

@AfterRemove()
updateStatus() {
this.status = "removed";
Expand All @@ -146,14 +146,14 @@ Example:
@EventSubscriber()
export class PostSubscriber implements EntitySubscriberInterface<Post> {


/**
* Indicates that this subscriber only listen to Post events.
*/
listenTo() {
return Post;
}

/**
* Called before post insertion.
*/
Expand All @@ -170,15 +170,99 @@ To listen to any entity you just omit `listenTo` method and use `any`:
```typescript
@EventSubscriber()
export class PostSubscriber implements EntitySubscriberInterface {

/**
* Called before entity insertion.
* Called after entity is loaded.
*/
afterLoad(entity: any) {
console.log(`AFTER ENTITY LOADED: `, entity);
}

/**
* Called before post insertion.
*/
beforeInsert(event: InsertEvent<any>) {
console.log(`BEFORE ENTITY INSERTED: `, event.entity);
console.log(`BEFORE POST INSERTED: `, event.entity);
}

/**
* Called after entity insertion.
*/
afterInsert(event: InsertEvent<any>) {
console.log(`AFTER ENTITY INSERTED: `, event.entity);
}

/**
* Called before entity update.
*/
beforeUpdate(event: UpdateEvent<any>) {
console.log(`BEFORE ENTITY UPDATED: `, event.entity);
}

/**
* Called after entity update.
*/
afterUpdate(event: UpdateEvent<any>) {
console.log(`AFTER ENTITY UPDATED: `, event.entity);
}

/**
* Called before entity removal.
*/
beforeRemove(event: RemoveEvent<any>) {
console.log(`BEFORE ENTITY WITH ID ${event.entityId} REMOVED: `, event.entity);
}

/**
* Called after entity removal.
*/
afterRemove(event: RemoveEvent<any>) {
console.log(`AFTER ENTITY WITH ID ${event.entityId} REMOVED: `, event.entity);
}

/**
* Called before transaction start.
*/
beforeTransactionStart(event: TransactionStartEvent) {
console.log(`BEFORE TRANSACTION STARTED: `, event);
}

/**
* Called after transaction start.
*/
afterTransactionStart(event: TransactionStartEvent) {
console.log(`AFTER TRANSACTION STARTED: `, event);
}

/**
* Called before transaction commit.
*/
beforeTransactionCommit(event: TransactionCommitEvent) {
console.log(`BEFORE TRANSACTION COMMITTED: `, event);
}

/**
* Called after transaction commit.
*/
afterTransactionCommit(event: TransactionCommitEvent) {
console.log(`AFTER TRANSACTION COMMITTED: `, event);
}

/**
* Called before transaction rollback.
*/
beforeTransactionRollback(event: TransactionRollbackEvent) {
console.log(`BEFORE TRANSACTION ROLLBACK: `, event);
}

/**
* Called after transaction rollback.
*/
afterTransactionRollback(event: TransactionRollbackEvent) {
console.log(`AFTER TRANSACTION ROLLBACK: `, event);
}

}
```

Make sure your `subscribers` property is set in your [Connection Options](./connection-options.md#common-connection-options) so TypeORM loads your subscriber.
Make sure your `subscribers` property is set in your [Connection Options](./connection-options.md#common-connection-options) so TypeORM loads your subscriber.
25 changes: 25 additions & 0 deletions src/driver/aurora-data-api-pg/AuroraDataApiPostgresQueryRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {IsolationLevel} from "../types/IsolationLevel";
import {AuroraDataApiPostgresDriver} from "./AuroraDataApiPostgresDriver";
import {PostgresQueryRunner} from "../postgres/PostgresQueryRunner";
import {ReplicationMode} from "../types/ReplicationMode";
import {BroadcasterResult} from "../../subscriber/BroadcasterResult";

class PostgresQueryRunnerWrapper extends PostgresQueryRunner {
driver: any;
Expand Down Expand Up @@ -93,8 +94,16 @@ export class AuroraDataApiPostgresQueryRunner extends PostgresQueryRunnerWrapper
if (this.isTransactionActive)
throw new TransactionAlreadyStartedError();

const beforeBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastBeforeTransactionStartEvent(beforeBroadcastResult);
if (beforeBroadcastResult.promises.length > 0) await Promise.all(beforeBroadcastResult.promises);

this.isTransactionActive = true;
await this.driver.client.startTransaction();

const afterBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastAfterTransactionStartEvent(afterBroadcastResult);
if (afterBroadcastResult.promises.length > 0) await Promise.all(afterBroadcastResult.promises);
}

/**
Expand All @@ -105,8 +114,16 @@ export class AuroraDataApiPostgresQueryRunner extends PostgresQueryRunnerWrapper
if (!this.isTransactionActive)
throw new TransactionNotStartedError();

const beforeBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastBeforeTransactionCommitEvent(beforeBroadcastResult);
if (beforeBroadcastResult.promises.length > 0) await Promise.all(beforeBroadcastResult.promises);

await this.driver.client.commitTransaction();
this.isTransactionActive = false;

const afterBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastAfterTransactionCommitEvent(afterBroadcastResult);
if (afterBroadcastResult.promises.length > 0) await Promise.all(afterBroadcastResult.promises);
}

/**
Expand All @@ -117,8 +134,16 @@ export class AuroraDataApiPostgresQueryRunner extends PostgresQueryRunnerWrapper
if (!this.isTransactionActive)
throw new TransactionNotStartedError();

const beforeBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastBeforeTransactionRollbackEvent(beforeBroadcastResult);
if (beforeBroadcastResult.promises.length > 0) await Promise.all(beforeBroadcastResult.promises);

await this.driver.client.rollbackTransaction();
this.isTransactionActive = false;

const afterBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastAfterTransactionRollbackEvent(afterBroadcastResult);
if (afterBroadcastResult.promises.length > 0) await Promise.all(afterBroadcastResult.promises);
}

/**
Expand Down
25 changes: 25 additions & 0 deletions src/driver/aurora-data-api/AuroraDataApiQueryRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {ColumnType} from "../../index";
import {TableCheck} from "../../schema-builder/table/TableCheck";
import {IsolationLevel} from "../types/IsolationLevel";
import {TableExclusion} from "../../schema-builder/table/TableExclusion";
import {BroadcasterResult} from "../../subscriber/BroadcasterResult";

/**
* Runs queries on a single mysql database connection.
Expand Down Expand Up @@ -86,8 +87,16 @@ export class AuroraDataApiQueryRunner extends BaseQueryRunner implements QueryRu
if (this.isTransactionActive)
throw new TransactionAlreadyStartedError();

const beforeBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastBeforeTransactionStartEvent(beforeBroadcastResult);
if (beforeBroadcastResult.promises.length > 0) await Promise.all(beforeBroadcastResult.promises);

this.isTransactionActive = true;
await this.driver.client.startTransaction();

const afterBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastAfterTransactionStartEvent(afterBroadcastResult);
if (afterBroadcastResult.promises.length > 0) await Promise.all(afterBroadcastResult.promises);
}

/**
Expand All @@ -98,8 +107,16 @@ export class AuroraDataApiQueryRunner extends BaseQueryRunner implements QueryRu
if (!this.isTransactionActive)
throw new TransactionNotStartedError();

const beforeBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastBeforeTransactionCommitEvent(beforeBroadcastResult);
if (beforeBroadcastResult.promises.length > 0) await Promise.all(beforeBroadcastResult.promises);

await this.driver.client.commitTransaction();
this.isTransactionActive = false;

const afterBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastAfterTransactionCommitEvent(afterBroadcastResult);
if (afterBroadcastResult.promises.length > 0) await Promise.all(afterBroadcastResult.promises);
}

/**
Expand All @@ -110,8 +127,16 @@ export class AuroraDataApiQueryRunner extends BaseQueryRunner implements QueryRu
if (!this.isTransactionActive)
throw new TransactionNotStartedError();

const beforeBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastBeforeTransactionRollbackEvent(beforeBroadcastResult);
if (beforeBroadcastResult.promises.length > 0) await Promise.all(beforeBroadcastResult.promises);

await this.driver.client.rollbackTransaction();
this.isTransactionActive = false;

const afterBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastAfterTransactionRollbackEvent(afterBroadcastResult);
if (afterBroadcastResult.promises.length > 0) await Promise.all(afterBroadcastResult.promises);
}

/**
Expand Down
25 changes: 25 additions & 0 deletions src/driver/cockroachdb/CockroachQueryRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {ColumnType} from "../../index";
import {IsolationLevel} from "../types/IsolationLevel";
import {TableExclusion} from "../../schema-builder/table/TableExclusion";
import {ReplicationMode} from "../types/ReplicationMode";
import {BroadcasterResult} from "../../subscriber/BroadcasterResult";

/**
* Runs queries on a single postgres database connection.
Expand Down Expand Up @@ -130,13 +131,21 @@ export class CockroachQueryRunner extends BaseQueryRunner implements QueryRunner
if (this.isTransactionActive)
throw new TransactionAlreadyStartedError();

const beforeBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastBeforeTransactionStartEvent(beforeBroadcastResult);
if (beforeBroadcastResult.promises.length > 0) await Promise.all(beforeBroadcastResult.promises);

this.isTransactionActive = true;
await this.query("START TRANSACTION");
await this.query("SAVEPOINT cockroach_restart");
if (isolationLevel) {
await this.query("SET TRANSACTION ISOLATION LEVEL " + isolationLevel);
}
this.storeQueries = true;

const afterBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastAfterTransactionStartEvent(afterBroadcastResult);
if (afterBroadcastResult.promises.length > 0) await Promise.all(afterBroadcastResult.promises);
}

/**
Expand All @@ -147,6 +156,10 @@ export class CockroachQueryRunner extends BaseQueryRunner implements QueryRunner
if (!this.isTransactionActive)
throw new TransactionNotStartedError();

const beforeBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastBeforeTransactionCommitEvent(beforeBroadcastResult);
if (beforeBroadcastResult.promises.length > 0) await Promise.all(beforeBroadcastResult.promises);

this.storeQueries = false;

try {
Expand All @@ -164,6 +177,10 @@ export class CockroachQueryRunner extends BaseQueryRunner implements QueryRunner
await this.commitTransaction();
}
}

const afterBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastAfterTransactionCommitEvent(afterBroadcastResult);
if (afterBroadcastResult.promises.length > 0) await Promise.all(afterBroadcastResult.promises);
}

/**
Expand All @@ -174,10 +191,18 @@ export class CockroachQueryRunner extends BaseQueryRunner implements QueryRunner
if (!this.isTransactionActive)
throw new TransactionNotStartedError();

const beforeBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastBeforeTransactionRollbackEvent(beforeBroadcastResult);
if (beforeBroadcastResult.promises.length > 0) await Promise.all(beforeBroadcastResult.promises);

this.storeQueries = false;
await this.query("ROLLBACK");
this.queries = [];
this.isTransactionActive = false;

const afterBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastAfterTransactionRollbackEvent(afterBroadcastResult);
if (afterBroadcastResult.promises.length > 0) await Promise.all(afterBroadcastResult.promises);
}

/**
Expand Down

0 comments on commit 0e4b239

Please sign in to comment.