Skip to content

Commit

Permalink
fix(data-api): Fixed how data api driver uses and reuses a client (#6869
Browse files Browse the repository at this point in the history
)

Now each queryRunner will have it's own client which will allow it to handle transactions correctly.
  • Loading branch information
ArsenyYankovsky committed Nov 11, 2020
1 parent 296eb39 commit 6ce65fb
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 42 deletions.
28 changes: 14 additions & 14 deletions src/driver/aurora-data-api-pg/AuroraDataApiPostgresDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ export class AuroraDataApiPostgresDriver extends PostgresWrapper implements Driv
*/
DataApiDriver: any;

client: any;

// -------------------------------------------------------------------------
// Public Implemented Properties
// -------------------------------------------------------------------------
Expand All @@ -56,16 +54,6 @@ export class AuroraDataApiPostgresDriver extends PostgresWrapper implements Driv

// load data-api package
this.loadDependencies();

this.client = new this.DataApiDriver(
this.options.region,
this.options.secretArn,
this.options.resourceArn,
this.options.database,
(query: string, parameters?: any[]) => this.connection.logger.logQuery(query, parameters),
this.options.serviceConfigOptions,
this.options.formatOptions,
);
}

// -------------------------------------------------------------------------
Expand All @@ -90,7 +78,19 @@ export class AuroraDataApiPostgresDriver extends PostgresWrapper implements Driv
* Creates a query runner used to execute database queries.
*/
createQueryRunner(mode: ReplicationMode) {
return new AuroraDataApiPostgresQueryRunner(this, mode);
return new AuroraDataApiPostgresQueryRunner(
this,
new this.DataApiDriver(
this.options.region,
this.options.secretArn,
this.options.resourceArn,
this.options.database,
(query: string, parameters?: any[]) => this.connection.logger.logQuery(query, parameters),
this.options.serviceConfigOptions,
this.options.formatOptions,
),
mode
);
}

// -------------------------------------------------------------------------
Expand All @@ -110,7 +110,7 @@ export class AuroraDataApiPostgresDriver extends PostgresWrapper implements Driv
* Executes given query.
*/
protected executeQuery(connection: any, query: string) {
return this.client.query(query);
return this.connection.query(query);
}

/**
Expand Down
19 changes: 12 additions & 7 deletions src/driver/aurora-data-api-pg/AuroraDataApiPostgresQueryRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ export class AuroraDataApiPostgresQueryRunner extends PostgresQueryRunnerWrapper
*/
driver: AuroraDataApiPostgresDriver;

protected client: any;

// -------------------------------------------------------------------------
// Protected Properties
// -------------------------------------------------------------------------
Expand All @@ -48,8 +50,10 @@ export class AuroraDataApiPostgresQueryRunner extends PostgresQueryRunnerWrapper
// Constructor
// -------------------------------------------------------------------------

constructor(driver: AuroraDataApiPostgresDriver, mode: ReplicationMode) {
constructor(driver: AuroraDataApiPostgresDriver, client: any, mode: ReplicationMode) {
super(driver, mode);

this.client = client
}

// -------------------------------------------------------------------------
Expand Down Expand Up @@ -99,7 +103,8 @@ export class AuroraDataApiPostgresQueryRunner extends PostgresQueryRunnerWrapper
if (beforeBroadcastResult.promises.length > 0) await Promise.all(beforeBroadcastResult.promises);

this.isTransactionActive = true;
await this.driver.client.startTransaction();

await this.client.startTransaction();

const afterBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastAfterTransactionStartEvent(afterBroadcastResult);
Expand All @@ -113,12 +118,13 @@ export class AuroraDataApiPostgresQueryRunner extends PostgresQueryRunnerWrapper
async commitTransaction(): Promise<void> {
if (!this.isTransactionActive)
throw new TransactionNotStartedError();

const beforeBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastBeforeTransactionCommitEvent(beforeBroadcastResult);
if (beforeBroadcastResult.promises.length > 0) await Promise.all(beforeBroadcastResult.promises);

await this.driver.client.commitTransaction();
await this.client.commitTransaction();

this.isTransactionActive = false;

const afterBroadcastResult = new BroadcasterResult();
Expand All @@ -138,8 +144,7 @@ export class AuroraDataApiPostgresQueryRunner extends PostgresQueryRunnerWrapper
this.broadcaster.broadcastBeforeTransactionRollbackEvent(beforeBroadcastResult);
if (beforeBroadcastResult.promises.length > 0) await Promise.all(beforeBroadcastResult.promises);

await this.driver.client.rollbackTransaction();
this.isTransactionActive = false;
await this.client.rollbackTransaction();

const afterBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastAfterTransactionRollbackEvent(afterBroadcastResult);
Expand All @@ -153,7 +158,7 @@ export class AuroraDataApiPostgresQueryRunner extends PostgresQueryRunnerWrapper
if (this.isReleased)
throw new QueryRunnerAlreadyReleasedError();

const result = await this.driver.client.query(query, parameters);
const result = await this.client.query(query, parameters);

if (result.records) {
return result.records;
Expand Down
22 changes: 9 additions & 13 deletions src/driver/aurora-data-api/AuroraDataApiDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ export class AuroraDataApiDriver implements Driver {
*/
DataApiDriver: any;

client: any;

/**
* Connection pool.
* Used in non-replication mode.
Expand Down Expand Up @@ -301,16 +299,6 @@ export class AuroraDataApiDriver implements Driver {
// load mysql package
this.loadDependencies();

this.client = new this.DataApiDriver(
this.options.region,
this.options.secretArn,
this.options.resourceArn,
this.options.database,
(query: string, parameters?: any[]) => this.connection.logger.logQuery(query, parameters),
this.options.serviceConfigOptions,
this.options.formatOptions,
);

// validate options to make sure everything is set
// todo: revisit validation with replication in mind
// if (!(this.options.host || (this.options.extra && this.options.extra.socketPath)) && !this.options.socketPath)
Expand Down Expand Up @@ -357,7 +345,15 @@ export class AuroraDataApiDriver implements Driver {
* Creates a query runner used to execute database queries.
*/
createQueryRunner(mode: ReplicationMode) {
return new AuroraDataApiQueryRunner(this);
return new AuroraDataApiQueryRunner(this, new this.DataApiDriver(
this.options.region,
this.options.secretArn,
this.options.resourceArn,
this.options.database,
(query: string, parameters?: any[]) => this.connection.logger.logQuery(query, parameters),
this.options.serviceConfigOptions,
this.options.formatOptions,
));
}

/**
Expand Down
15 changes: 10 additions & 5 deletions src/driver/aurora-data-api/AuroraDataApiQueryRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ export class AuroraDataApiQueryRunner extends BaseQueryRunner implements QueryRu

driver: AuroraDataApiDriver;

protected client: any

// -------------------------------------------------------------------------
// Protected Properties
// -------------------------------------------------------------------------
Expand All @@ -50,10 +52,11 @@ export class AuroraDataApiQueryRunner extends BaseQueryRunner implements QueryRu
// Constructor
// -------------------------------------------------------------------------

constructor(driver: AuroraDataApiDriver) {
constructor(driver: AuroraDataApiDriver, client: any) {
super();
this.driver = driver;
this.connection = driver.connection;
this.client = client;
this.broadcaster = new Broadcaster(this);
}

Expand Down Expand Up @@ -92,7 +95,8 @@ export class AuroraDataApiQueryRunner extends BaseQueryRunner implements QueryRu
if (beforeBroadcastResult.promises.length > 0) await Promise.all(beforeBroadcastResult.promises);

this.isTransactionActive = true;
await this.driver.client.startTransaction();
await this.client.startTransaction();


const afterBroadcastResult = new BroadcasterResult();
this.broadcaster.broadcastAfterTransactionStartEvent(afterBroadcastResult);
Expand All @@ -111,7 +115,7 @@ export class AuroraDataApiQueryRunner extends BaseQueryRunner implements QueryRu
this.broadcaster.broadcastBeforeTransactionCommitEvent(beforeBroadcastResult);
if (beforeBroadcastResult.promises.length > 0) await Promise.all(beforeBroadcastResult.promises);

await this.driver.client.commitTransaction();
await this.client.commitTransaction();
this.isTransactionActive = false;

const afterBroadcastResult = new BroadcasterResult();
Expand All @@ -131,7 +135,8 @@ export class AuroraDataApiQueryRunner extends BaseQueryRunner implements QueryRu
this.broadcaster.broadcastBeforeTransactionRollbackEvent(beforeBroadcastResult);
if (beforeBroadcastResult.promises.length > 0) await Promise.all(beforeBroadcastResult.promises);

await this.driver.client.rollbackTransaction();
await this.client.rollbackTransaction();

this.isTransactionActive = false;

const afterBroadcastResult = new BroadcasterResult();
Expand All @@ -146,7 +151,7 @@ export class AuroraDataApiQueryRunner extends BaseQueryRunner implements QueryRu
if (this.isReleased)
throw new QueryRunnerAlreadyReleasedError();

const result = await this.driver.client.query(query, parameters);
const result = await this.client.query(query, parameters);

if (result.records) {
return result.records;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ describe("entity subscriber transaction flow", () => {
const queryRunner = await connection.createQueryRunner();

if (connection.driver instanceof AuroraDataApiPostgresDriver || connection.driver instanceof AuroraDataApiDriver) {
const startTransactionFn = sinon.spy(connection.driver.client.startTransaction);
const startTransactionFn = sinon.spy(queryRunner.startTransaction);
await queryRunner.startTransaction();

expect(beforeTransactionStart.calledBefore(startTransactionFn)).to.be.true;
Expand Down Expand Up @@ -130,7 +130,7 @@ describe("entity subscriber transaction flow", () => {
await queryRunner.startTransaction();

if (connection.driver instanceof AuroraDataApiPostgresDriver || connection.driver instanceof AuroraDataApiDriver) {
const commitTransactionFn = sinon.spy(connection.driver.client.commitTransaction);
const commitTransactionFn = sinon.spy(queryRunner.commitTransaction);
await queryRunner.commitTransaction();

expect(beforeTransactionCommit.calledBefore(commitTransactionFn)).to.be.true;
Expand Down Expand Up @@ -176,7 +176,7 @@ describe("entity subscriber transaction flow", () => {
await queryRunner.startTransaction();

if (connection.driver instanceof AuroraDataApiPostgresDriver || connection.driver instanceof AuroraDataApiDriver) {
const rollbackTransactionFn = sinon.spy(connection.driver.client.rollbackTransaction);
const rollbackTransactionFn = sinon.spy(queryRunner.rollbackTransaction);
await queryRunner.rollbackTransaction();

expect(beforeTransactionRollback.calledBefore(rollbackTransactionFn)).to.be.true;
Expand Down

0 comments on commit 6ce65fb

Please sign in to comment.