/
AuroraDataApiPostgresQueryRunner.ts
138 lines (112 loc) · 4.7 KB
/
AuroraDataApiPostgresQueryRunner.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import {QueryRunnerAlreadyReleasedError} from "../../error/QueryRunnerAlreadyReleasedError";
import {TransactionAlreadyStartedError} from "../../error/TransactionAlreadyStartedError";
import {TransactionNotStartedError} from "../../error/TransactionNotStartedError";
import {QueryRunner} from "../../query-runner/QueryRunner";
import {IsolationLevel} from "../types/IsolationLevel";
import {AuroraDataApiPostgresDriver} from "../postgres/PostgresDriver";
import {PostgresQueryRunner} from "../postgres/PostgresQueryRunner";
class PostgresQueryRunnerWrapper extends PostgresQueryRunner {
driver: any;
constructor(driver: any, mode: "master"|"slave") {
super(driver, mode);
}
}
/**
* Runs queries on a single postgres database connection.
*/
export class AuroraDataApiPostgresQueryRunner extends PostgresQueryRunnerWrapper implements QueryRunner {
// -------------------------------------------------------------------------
// Public Implemented Properties
// -------------------------------------------------------------------------
/**
* Database driver used by connection.
*/
driver: AuroraDataApiPostgresDriver;
// -------------------------------------------------------------------------
// Protected Properties
// -------------------------------------------------------------------------
/**
* Promise used to obtain a database connection for a first time.
*/
protected databaseConnectionPromise: Promise<any>;
/**
* Special callback provided by a driver used to release a created connection.
*/
protected releaseCallback: Function;
// -------------------------------------------------------------------------
// Constructor
// -------------------------------------------------------------------------
constructor(driver: AuroraDataApiPostgresDriver, mode: "master"|"slave" = "master") {
super(driver, mode);
}
// -------------------------------------------------------------------------
// Public Methods
// -------------------------------------------------------------------------
/**
* Creates/uses database connection from the connection pool to perform further operations.
* Returns obtained database connection.
*/
connect(): Promise<any> {
if (this.databaseConnection)
return Promise.resolve(this.databaseConnection);
if (this.databaseConnectionPromise)
return this.databaseConnectionPromise;
if (this.mode === "slave" && this.driver.isReplicated) {
this.databaseConnectionPromise = this.driver.obtainSlaveConnection().then(([ connection, release]: any[]) => {
this.driver.connectedQueryRunners.push(this);
this.databaseConnection = connection;
this.releaseCallback = release;
return this.databaseConnection;
});
} else { // master
this.databaseConnectionPromise = this.driver.obtainMasterConnection().then(([connection, release]: any[]) => {
this.driver.connectedQueryRunners.push(this);
this.databaseConnection = connection;
this.releaseCallback = release;
return this.databaseConnection;
});
}
return this.databaseConnectionPromise;
}
/**
* Starts transaction on the current connection.
*/
async startTransaction(isolationLevel?: IsolationLevel): Promise<void> {
if (this.isTransactionActive)
throw new TransactionAlreadyStartedError();
this.isTransactionActive = true;
await this.driver.client.startTransaction();
}
/**
* Commits transaction.
* Error will be thrown if transaction was not started.
*/
async commitTransaction(): Promise<void> {
if (!this.isTransactionActive)
throw new TransactionNotStartedError();
await this.driver.client.commitTransaction();
this.isTransactionActive = false;
}
/**
* Rollbacks transaction.
* Error will be thrown if transaction was not started.
*/
async rollbackTransaction(): Promise<void> {
if (!this.isTransactionActive)
throw new TransactionNotStartedError();
await this.driver.client.rollbackTransaction();
this.isTransactionActive = false;
}
/**
* Executes a given SQL query.
*/
async query(query: string, parameters?: any[]): Promise<any> {
if (this.isReleased)
throw new QueryRunnerAlreadyReleasedError();
const result = await this.driver.client.query(query, parameters);
if (result.records) {
return result.records;
}
return result;
}
}