/
SqliteQueryRunner.ts
95 lines (79 loc) · 3.56 KB
/
SqliteQueryRunner.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import {QueryRunnerAlreadyReleasedError} from "../../error/QueryRunnerAlreadyReleasedError";
import {QueryFailedError} from "../../error/QueryFailedError";
import {AbstractSqliteQueryRunner} from "../sqlite-abstract/AbstractSqliteQueryRunner";
import {SqliteConnectionOptions} from "./SqliteConnectionOptions";
import {SqliteDriver} from "./SqliteDriver";
import {Broadcaster} from "../../subscriber/Broadcaster";
import {IsolationLevel} from "../types/IsolationLevel";
/**
* Runs queries on a single sqlite database connection.
*
* Does not support compose primary keys with autoincrement field.
* todo: need to throw exception for this case.
*/
export class SqliteQueryRunner extends AbstractSqliteQueryRunner {
/**
* Database driver used by connection.
*/
driver: SqliteDriver;
// -------------------------------------------------------------------------
// Constructor
// -------------------------------------------------------------------------
constructor(driver: SqliteDriver) {
super();
this.driver = driver;
this.connection = driver.connection;
this.broadcaster = new Broadcaster(this);
}
/**
* Executes a given SQL query.
*/
query(query: string, parameters?: any[]): Promise<any> {
if (this.isReleased)
throw new QueryRunnerAlreadyReleasedError();
const connection = this.driver.connection;
const options = connection.options as SqliteConnectionOptions;
return new Promise<any[]>(async (ok, fail) => {
const databaseConnection = await this.connect();
this.driver.connection.logger.logQuery(query, parameters, this);
const queryStartTime = +new Date();
const isInsertQuery = query.substr(0, 11) === "INSERT INTO";
const execute = async () => {
if (isInsertQuery) {
databaseConnection.run(query, parameters, handler);
} else {
databaseConnection.all(query, parameters, handler);
}
};
const handler = function (err: any, result: any) {
if (err) {
if (err.toString().indexOf("SQLITE_BUSY:") !== -1 &&
typeof options.busyErrorRetry === "number" &&
options.busyErrorRetry) {
setTimeout(execute, options.busyErrorRetry);
return;
}
}
// log slow queries if maxQueryExecution time is set
const maxQueryExecutionTime = connection.options.maxQueryExecutionTime;
const queryEndTime = +new Date();
const queryExecutionTime = queryEndTime - queryStartTime;
if (maxQueryExecutionTime && queryExecutionTime > maxQueryExecutionTime)
connection.logger.logQuerySlow(queryExecutionTime, query, parameters, this);
if (err) {
connection.logger.logQueryError(err, query, parameters, this);
fail(new QueryFailedError(query, parameters, err));
} else {
ok(isInsertQuery ? this["lastID"] : result);
}
};
await execute();
});
}
async startTransaction(isolationLevel?: IsolationLevel): Promise<void> {
if ((this.connection.options as SqliteConnectionOptions).enableWAL === true) {
await this.query("PRAGMA journal_mode = WAL");
}
return super.startTransaction(isolationLevel);
}
}