Skip to content

Commit

Permalink
refactor: abstract away Broadcaster boilerplate (#8235)
Browse files Browse the repository at this point in the history
  • Loading branch information
imnotjames committed Oct 4, 2021
1 parent d8c5812 commit dd94c9d
Show file tree
Hide file tree
Showing 25 changed files with 240 additions and 416 deletions.
29 changes: 8 additions & 21 deletions src/driver/aurora-data-api-pg/AuroraDataApiPostgresQueryRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import {IsolationLevel} from "../types/IsolationLevel";
import {AuroraDataApiPostgresDriver} from "./AuroraDataApiPostgresDriver";
import {PostgresQueryRunner} from "../postgres/PostgresQueryRunner";
import {ReplicationMode} from "../types/ReplicationMode";
import {BroadcasterResult} from "../../subscriber/BroadcasterResult";
import { QueryResult } from "../../query-runner/QueryResult";

class PostgresQueryRunnerWrapper extends PostgresQueryRunner {
Expand Down Expand Up @@ -94,17 +93,13 @@ export class AuroraDataApiPostgresQueryRunner extends PostgresQueryRunnerWrapper
if (this.isTransactionActive)
throw new TransactionAlreadyStartedError();

const beforeBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastBeforeTransactionStartEvent(beforeBroadcastResult);
if (beforeBroadcastResult.promises.length > 0) await Promise.all(beforeBroadcastResult.promises);
await this.broadcaster.broadcast('BeforeTransactionStart')

this.isTransactionActive = true;

await this.client.startTransaction();

const afterBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastAfterTransactionStartEvent(afterBroadcastResult);
if (afterBroadcastResult.promises.length > 0) await Promise.all(afterBroadcastResult.promises);
await this.broadcaster.broadcast('AfterTransactionStart')
}

/**
Expand All @@ -114,18 +109,14 @@ export class AuroraDataApiPostgresQueryRunner extends PostgresQueryRunnerWrapper
async commitTransaction(): Promise<void> {
if (!this.isTransactionActive)
throw new TransactionNotStartedError();

const beforeBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastBeforeTransactionCommitEvent(beforeBroadcastResult);
if (beforeBroadcastResult.promises.length > 0) await Promise.all(beforeBroadcastResult.promises);

await this.broadcaster.broadcast('BeforeTransactionCommit');

await this.client.commitTransaction();

this.isTransactionActive = false;

const afterBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastAfterTransactionCommitEvent(afterBroadcastResult);
if (afterBroadcastResult.promises.length > 0) await Promise.all(afterBroadcastResult.promises);
await this.broadcaster.broadcast('AfterTransactionCommit');
}

/**
Expand All @@ -136,15 +127,11 @@ export class AuroraDataApiPostgresQueryRunner extends PostgresQueryRunnerWrapper
if (!this.isTransactionActive)
throw new TransactionNotStartedError();

const beforeBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastBeforeTransactionRollbackEvent(beforeBroadcastResult);
if (beforeBroadcastResult.promises.length > 0) await Promise.all(beforeBroadcastResult.promises);
await this.broadcaster.broadcast('BeforeTransactionRollback');

await this.client.rollbackTransaction();

const afterBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastAfterTransactionRollbackEvent(afterBroadcastResult);
if (afterBroadcastResult.promises.length > 0) await Promise.all(afterBroadcastResult.promises);
await this.broadcaster.broadcast('AfterTransactionRollback');
}

/**
Expand Down
26 changes: 6 additions & 20 deletions src/driver/aurora-data-api/AuroraDataApiQueryRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import {ColumnType} from "../types/ColumnTypes";
import {TableCheck} from "../../schema-builder/table/TableCheck";
import {IsolationLevel} from "../types/IsolationLevel";
import {TableExclusion} from "../../schema-builder/table/TableExclusion";
import {BroadcasterResult} from "../../subscriber/BroadcasterResult";
import { TypeORMError } from "../../error";

/**
Expand Down Expand Up @@ -92,17 +91,12 @@ export class AuroraDataApiQueryRunner extends BaseQueryRunner implements QueryRu
if (this.isTransactionActive)
throw new TransactionAlreadyStartedError();

const beforeBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastBeforeTransactionStartEvent(beforeBroadcastResult);
if (beforeBroadcastResult.promises.length > 0) await Promise.all(beforeBroadcastResult.promises);
await this.broadcaster.broadcast('BeforeTransactionStart');

this.isTransactionActive = true;
await this.client.startTransaction();


const afterBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastAfterTransactionStartEvent(afterBroadcastResult);
if (afterBroadcastResult.promises.length > 0) await Promise.all(afterBroadcastResult.promises);
await this.broadcaster.broadcast('AfterTransactionStart');
}

/**
Expand All @@ -113,16 +107,12 @@ export class AuroraDataApiQueryRunner extends BaseQueryRunner implements QueryRu
if (!this.isTransactionActive)
throw new TransactionNotStartedError();

const beforeBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastBeforeTransactionCommitEvent(beforeBroadcastResult);
if (beforeBroadcastResult.promises.length > 0) await Promise.all(beforeBroadcastResult.promises);
await this.broadcaster.broadcast('BeforeTransactionCommit');

await this.client.commitTransaction();
this.isTransactionActive = false;

const afterBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastAfterTransactionCommitEvent(afterBroadcastResult);
if (afterBroadcastResult.promises.length > 0) await Promise.all(afterBroadcastResult.promises);
await this.broadcaster.broadcast('AfterTransactionCommit');
}

/**
Expand All @@ -133,17 +123,13 @@ export class AuroraDataApiQueryRunner extends BaseQueryRunner implements QueryRu
if (!this.isTransactionActive)
throw new TransactionNotStartedError();

const beforeBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastBeforeTransactionRollbackEvent(beforeBroadcastResult);
if (beforeBroadcastResult.promises.length > 0) await Promise.all(beforeBroadcastResult.promises);
await this.broadcaster.broadcast('BeforeTransactionRollback');

await this.client.rollbackTransaction();

this.isTransactionActive = false;

const afterBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastAfterTransactionRollbackEvent(afterBroadcastResult);
if (afterBroadcastResult.promises.length > 0) await Promise.all(afterBroadcastResult.promises);
await this.broadcaster.broadcast('AfterTransactionRollback');
}

/**
Expand Down
25 changes: 6 additions & 19 deletions src/driver/cockroachdb/CockroachQueryRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import {ColumnType} from "../types/ColumnTypes";
import {IsolationLevel} from "../types/IsolationLevel";
import {TableExclusion} from "../../schema-builder/table/TableExclusion";
import {ReplicationMode} from "../types/ReplicationMode";
import {BroadcasterResult} from "../../subscriber/BroadcasterResult";
import { TypeORMError } from "../../error";

/**
Expand Down Expand Up @@ -133,9 +132,7 @@ export class CockroachQueryRunner extends BaseQueryRunner implements QueryRunner
if (this.isTransactionActive)
throw new TransactionAlreadyStartedError();

const beforeBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastBeforeTransactionStartEvent(beforeBroadcastResult);
if (beforeBroadcastResult.promises.length > 0) await Promise.all(beforeBroadcastResult.promises);
await this.broadcaster.broadcast('BeforeTransactionStart');

this.isTransactionActive = true;
await this.query("START TRANSACTION");
Expand All @@ -145,9 +142,7 @@ export class CockroachQueryRunner extends BaseQueryRunner implements QueryRunner
}
this.storeQueries = true;

const afterBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastAfterTransactionStartEvent(afterBroadcastResult);
if (afterBroadcastResult.promises.length > 0) await Promise.all(afterBroadcastResult.promises);
await this.broadcaster.broadcast('AfterTransactionStart');
}

/**
Expand All @@ -158,9 +153,7 @@ export class CockroachQueryRunner extends BaseQueryRunner implements QueryRunner
if (!this.isTransactionActive)
throw new TransactionNotStartedError();

const beforeBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastBeforeTransactionCommitEvent(beforeBroadcastResult);
if (beforeBroadcastResult.promises.length > 0) await Promise.all(beforeBroadcastResult.promises);
await this.broadcaster.broadcast('BeforeTransactionCommit');

this.storeQueries = false;

Expand All @@ -180,9 +173,7 @@ export class CockroachQueryRunner extends BaseQueryRunner implements QueryRunner
}
}

const afterBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastAfterTransactionCommitEvent(afterBroadcastResult);
if (afterBroadcastResult.promises.length > 0) await Promise.all(afterBroadcastResult.promises);
await this.broadcaster.broadcast('AfterTransactionCommit');
}

/**
Expand All @@ -193,18 +184,14 @@ export class CockroachQueryRunner extends BaseQueryRunner implements QueryRunner
if (!this.isTransactionActive)
throw new TransactionNotStartedError();

const beforeBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastBeforeTransactionRollbackEvent(beforeBroadcastResult);
if (beforeBroadcastResult.promises.length > 0) await Promise.all(beforeBroadcastResult.promises);
await this.broadcaster.broadcast('BeforeTransactionRollback');

this.storeQueries = false;
await this.query("ROLLBACK");
this.queries = [];
this.isTransactionActive = false;

const afterBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastAfterTransactionRollbackEvent(afterBroadcastResult);
if (afterBroadcastResult.promises.length > 0) await Promise.all(afterBroadcastResult.promises);
await this.broadcaster.broadcast('AfterTransactionRollback');
}

/**
Expand Down
25 changes: 6 additions & 19 deletions src/driver/expo/ExpoQueryRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import {TransactionAlreadyStartedError} from "../../error/TransactionAlreadyStar
import {TransactionNotStartedError} from "../../error/TransactionNotStartedError";
import {ExpoDriver} from "./ExpoDriver";
import {Broadcaster} from "../../subscriber/Broadcaster";
import {BroadcasterResult} from "../../subscriber/BroadcasterResult";
import { QueryResult } from "../../query-runner/QueryResult";

// Needed to satisfy the Typescript compiler
Expand Down Expand Up @@ -68,15 +67,11 @@ export class ExpoQueryRunner extends AbstractSqliteQueryRunner {
if (this.isTransactionActive && typeof this.transaction !== "undefined")
throw new TransactionAlreadyStartedError();

const beforeBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastBeforeTransactionStartEvent(beforeBroadcastResult);
if (beforeBroadcastResult.promises.length > 0) await Promise.all(beforeBroadcastResult.promises);
await this.broadcaster.broadcast('BeforeTransactionStart');

this.isTransactionActive = true;

const afterBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastAfterTransactionStartEvent(afterBroadcastResult);
if (afterBroadcastResult.promises.length > 0) await Promise.all(afterBroadcastResult.promises);
await this.broadcaster.broadcast('AfterTransactionStart');
}

/**
Expand All @@ -91,16 +86,12 @@ export class ExpoQueryRunner extends AbstractSqliteQueryRunner {
if (!this.isTransactionActive && typeof this.transaction === "undefined")
throw new TransactionNotStartedError();

const beforeBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastBeforeTransactionCommitEvent(beforeBroadcastResult);
if (beforeBroadcastResult.promises.length > 0) await Promise.all(beforeBroadcastResult.promises);
await this.broadcaster.broadcast('BeforeTransactionCommit');

this.isTransactionActive = false;
this.transaction = undefined;

const afterBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastAfterTransactionCommitEvent(afterBroadcastResult);
if (afterBroadcastResult.promises.length > 0) await Promise.all(afterBroadcastResult.promises);
await this.broadcaster.broadcast('AfterTransactionCommit');
}

/**
Expand All @@ -114,16 +105,12 @@ export class ExpoQueryRunner extends AbstractSqliteQueryRunner {
if (!this.isTransactionActive && typeof this.transaction === "undefined")
throw new TransactionNotStartedError();

const beforeBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastBeforeTransactionRollbackEvent(beforeBroadcastResult);
if (beforeBroadcastResult.promises.length > 0) await Promise.all(beforeBroadcastResult.promises);
await this.broadcaster.broadcast('BeforeTransactionRollback');

this.isTransactionActive = false;
this.transaction = undefined;

const afterBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastAfterTransactionRollbackEvent(afterBroadcastResult);
if (afterBroadcastResult.promises.length > 0) await Promise.all(afterBroadcastResult.promises);
await this.broadcaster.broadcast('AfterTransactionRollback');
}

/**
Expand Down
25 changes: 6 additions & 19 deletions src/driver/mysql/MysqlQueryRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import {IsolationLevel} from "../types/IsolationLevel";
import {TableExclusion} from "../../schema-builder/table/TableExclusion";
import {VersionUtils} from "../../util/VersionUtils";
import {ReplicationMode} from "../types/ReplicationMode";
import {BroadcasterResult} from "../../subscriber/BroadcasterResult";
import { TypeORMError } from "../../error";

/**
Expand Down Expand Up @@ -112,9 +111,7 @@ export class MysqlQueryRunner extends BaseQueryRunner implements QueryRunner {
if (this.isTransactionActive)
throw new TransactionAlreadyStartedError();

const beforeBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastBeforeTransactionStartEvent(beforeBroadcastResult);
if (beforeBroadcastResult.promises.length > 0) await Promise.all(beforeBroadcastResult.promises);
await this.broadcaster.broadcast('BeforeTransactionStart');

this.isTransactionActive = true;
if (isolationLevel) {
Expand All @@ -124,9 +121,7 @@ export class MysqlQueryRunner extends BaseQueryRunner implements QueryRunner {
await this.query("START TRANSACTION");
}

const afterBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastAfterTransactionStartEvent(afterBroadcastResult);
if (afterBroadcastResult.promises.length > 0) await Promise.all(afterBroadcastResult.promises);
await this.broadcaster.broadcast('AfterTransactionStart');
}

/**
Expand All @@ -137,16 +132,12 @@ export class MysqlQueryRunner extends BaseQueryRunner implements QueryRunner {
if (!this.isTransactionActive)
throw new TransactionNotStartedError();

const beforeBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastBeforeTransactionCommitEvent(beforeBroadcastResult);
if (beforeBroadcastResult.promises.length > 0) await Promise.all(beforeBroadcastResult.promises);
await this.broadcaster.broadcast('BeforeTransactionCommit');

await this.query("COMMIT");
this.isTransactionActive = false;

const afterBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastAfterTransactionCommitEvent(afterBroadcastResult);
if (afterBroadcastResult.promises.length > 0) await Promise.all(afterBroadcastResult.promises);
await this.broadcaster.broadcast('AfterTransactionCommit');
}

/**
Expand All @@ -157,16 +148,12 @@ export class MysqlQueryRunner extends BaseQueryRunner implements QueryRunner {
if (!this.isTransactionActive)
throw new TransactionNotStartedError();

const beforeBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastBeforeTransactionRollbackEvent(beforeBroadcastResult);
if (beforeBroadcastResult.promises.length > 0) await Promise.all(beforeBroadcastResult.promises);
await this.broadcaster.broadcast('BeforeTransactionRollback');

await this.query("ROLLBACK");
this.isTransactionActive = false;

const afterBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastAfterTransactionRollbackEvent(afterBroadcastResult);
if (afterBroadcastResult.promises.length > 0) await Promise.all(afterBroadcastResult.promises);
await this.broadcaster.broadcast('AfterTransactionRollback');
}

/**
Expand Down

0 comments on commit dd94c9d

Please sign in to comment.