Skip to content

Commit

Permalink
fix: prevent multiple release listeners in PostgresQueryRunner (#6708)
Browse files Browse the repository at this point in the history
move on-error-release code to the queryRunner connect function
so we only need to have one listener per query runner on each
connection - cutting donw the number of listeners total &
preventing a problem with too many listeners

closes #6699
  • Loading branch information
imnotjames committed Sep 14, 2020
1 parent 7a52f18 commit 208cf6b
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 12 deletions.
31 changes: 19 additions & 12 deletions src/driver/postgres/PostgresQueryRunner.ts
Expand Up @@ -79,18 +79,32 @@ export class PostgresQueryRunner extends BaseQueryRunner implements QueryRunner
return this.databaseConnectionPromise;

if (this.mode === "slave" && this.driver.isReplicated) {
this.databaseConnectionPromise = this.driver.obtainSlaveConnection().then(([ connection, release]: any[]) => {
this.databaseConnectionPromise = this.driver.obtainSlaveConnection().then(([connection, release]: any[]) => {
this.driver.connectedQueryRunners.push(this);
this.databaseConnection = connection;
this.releaseCallback = release;

const onErrorCallback = () => this.release();
this.releaseCallback = () => {
this.databaseConnection.removeListener("error", onErrorCallback);
release();
};
this.databaseConnection.on("error", onErrorCallback);

return this.databaseConnection;
});

} else { // master
this.databaseConnectionPromise = this.driver.obtainMasterConnection().then(([connection, release]: any[]) => {
this.driver.connectedQueryRunners.push(this);
this.databaseConnection = connection;
this.releaseCallback = release;

const onErrorCallback = () => this.release();
this.releaseCallback = () => {
this.databaseConnection.removeListener("error", onErrorCallback);
release();
};
this.databaseConnection.on("error", onErrorCallback);

return this.databaseConnection;
});
}
Expand All @@ -102,14 +116,14 @@ export class PostgresQueryRunner extends BaseQueryRunner implements QueryRunner
* Releases used database connection.
* You cannot use query runner methods once its released.
*/
release(err?: any): Promise<void> {
release(): Promise<void> {
if (this.isReleased) {
return Promise.resolve();
}

this.isReleased = true;
if (this.releaseCallback)
this.releaseCallback(err);
this.releaseCallback();

const index = this.driver.connectedQueryRunners.indexOf(this);
if (index !== -1) this.driver.connectedQueryRunners.splice(index);
Expand Down Expand Up @@ -168,14 +182,7 @@ export class PostgresQueryRunner extends BaseQueryRunner implements QueryRunner
this.driver.connection.logger.logQuery(query, parameters, this);
const queryStartTime = +new Date();

const onError = (err: any) => {
this.release(err);
};
databaseConnection.once("error", onError);

databaseConnection.query(query, parameters, (err: any, result: any) => {
databaseConnection.removeListener("error", onError);

// log slow queries if maxQueryExecution time is set
const maxQueryExecutionTime = this.driver.connection.options.maxQueryExecutionTime;
const queryEndTime = +new Date();
Expand Down
30 changes: 30 additions & 0 deletions test/github-issues/6699/issue-6699.ts
@@ -0,0 +1,30 @@
import {closeTestingConnections, createTestingConnections, reloadTestingDatabases} from "../../utils/test-utils";
import {Connection} from "../../../src/connection/Connection";
import {expect} from 'chai';

describe("github issues > #6699 MaxListenersExceededWarning occurs on Postgres", () => {

let connections: Connection[];
before(async () => connections = await createTestingConnections({
entities: [],
enabledDrivers: ["postgres"]
}));
beforeEach(() => reloadTestingDatabases(connections));
after(() => closeTestingConnections(connections));

it("queries in a transaction do not cause an EventEmitter memory leak", () => Promise.all(connections.map(async connection => {
await connection.transaction(async manager => {
const queryPromises = [...Array(10)].map(
() => manager.query('SELECT pg_sleep(0.0001)')
);

const pgConnection = await manager.queryRunner!.connect();

expect(pgConnection.listenerCount('error')).to.equal(1);

// Wait for all of the queries to finish and drain the backlog
await Promise.all(queryPromises);
});
})));

});

0 comments on commit 208cf6b

Please sign in to comment.