diff --git a/src/driver/postgres/PostgresQueryRunner.ts b/src/driver/postgres/PostgresQueryRunner.ts index e0ff11ebb5..716d3d190d 100644 --- a/src/driver/postgres/PostgresQueryRunner.ts +++ b/src/driver/postgres/PostgresQueryRunner.ts @@ -79,10 +79,17 @@ export class PostgresQueryRunner extends BaseQueryRunner implements QueryRunner return this.databaseConnectionPromise; if (this.mode === "slave" && this.driver.isReplicated) { - this.databaseConnectionPromise = this.driver.obtainSlaveConnection().then(([ connection, release]: any[]) => { + this.databaseConnectionPromise = this.driver.obtainSlaveConnection().then(([connection, release]: any[]) => { this.driver.connectedQueryRunners.push(this); this.databaseConnection = connection; - this.releaseCallback = release; + + const onErrorCallback = () => this.release(); + this.releaseCallback = () => { + this.databaseConnection.removeListener("error", onErrorCallback); + release(); + }; + this.databaseConnection.on("error", onErrorCallback); + return this.databaseConnection; }); @@ -90,7 +97,14 @@ export class PostgresQueryRunner extends BaseQueryRunner implements QueryRunner this.databaseConnectionPromise = this.driver.obtainMasterConnection().then(([connection, release]: any[]) => { this.driver.connectedQueryRunners.push(this); this.databaseConnection = connection; - this.releaseCallback = release; + + const onErrorCallback = () => this.release(); + this.releaseCallback = () => { + this.databaseConnection.removeListener("error", onErrorCallback); + release(); + }; + this.databaseConnection.on("error", onErrorCallback); + return this.databaseConnection; }); } @@ -102,14 +116,14 @@ export class PostgresQueryRunner extends BaseQueryRunner implements QueryRunner * Releases used database connection. * You cannot use query runner methods once its released. */ - release(err?: any): Promise { + release(): Promise { if (this.isReleased) { return Promise.resolve(); } this.isReleased = true; if (this.releaseCallback) - this.releaseCallback(err); + this.releaseCallback(); const index = this.driver.connectedQueryRunners.indexOf(this); if (index !== -1) this.driver.connectedQueryRunners.splice(index); @@ -168,14 +182,7 @@ export class PostgresQueryRunner extends BaseQueryRunner implements QueryRunner this.driver.connection.logger.logQuery(query, parameters, this); const queryStartTime = +new Date(); - const onError = (err: any) => { - this.release(err); - }; - databaseConnection.once("error", onError); - databaseConnection.query(query, parameters, (err: any, result: any) => { - databaseConnection.removeListener("error", onError); - // log slow queries if maxQueryExecution time is set const maxQueryExecutionTime = this.driver.connection.options.maxQueryExecutionTime; const queryEndTime = +new Date(); diff --git a/test/github-issues/6699/issue-6699.ts b/test/github-issues/6699/issue-6699.ts new file mode 100644 index 0000000000..b7f6d525c1 --- /dev/null +++ b/test/github-issues/6699/issue-6699.ts @@ -0,0 +1,30 @@ +import {closeTestingConnections, createTestingConnections, reloadTestingDatabases} from "../../utils/test-utils"; +import {Connection} from "../../../src/connection/Connection"; +import {expect} from 'chai'; + +describe("github issues > #6699 MaxListenersExceededWarning occurs on Postgres", () => { + + let connections: Connection[]; + before(async () => connections = await createTestingConnections({ + entities: [], + enabledDrivers: ["postgres"] + })); + beforeEach(() => reloadTestingDatabases(connections)); + after(() => closeTestingConnections(connections)); + + it("queries in a transaction do not cause an EventEmitter memory leak", () => Promise.all(connections.map(async connection => { + await connection.transaction(async manager => { + const queryPromises = [...Array(10)].map( + () => manager.query('SELECT pg_sleep(0.0001)') + ); + + const pgConnection = await manager.queryRunner!.connect(); + + expect(pgConnection.listenerCount('error')).to.equal(1); + + // Wait for all of the queries to finish and drain the backlog + await Promise.all(queryPromises); + }); + }))); + +});