Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: prevent multiple release listeners in PostgresQueryRunner #6708

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 19 additions & 12 deletions src/driver/postgres/PostgresQueryRunner.ts
Expand Up @@ -79,18 +79,32 @@ export class PostgresQueryRunner extends BaseQueryRunner implements QueryRunner
return this.databaseConnectionPromise;

if (this.mode === "slave" && this.driver.isReplicated) {
this.databaseConnectionPromise = this.driver.obtainSlaveConnection().then(([ connection, release]: any[]) => {
this.databaseConnectionPromise = this.driver.obtainSlaveConnection().then(([connection, release]: any[]) => {
this.driver.connectedQueryRunners.push(this);
this.databaseConnection = connection;
this.releaseCallback = release;

const onErrorCallback = () => this.release();
this.releaseCallback = () => {
this.databaseConnection.removeListener("error", onErrorCallback);
release();
};
this.databaseConnection.on("error", onErrorCallback);

return this.databaseConnection;
});

} else { // master
this.databaseConnectionPromise = this.driver.obtainMasterConnection().then(([connection, release]: any[]) => {
this.driver.connectedQueryRunners.push(this);
this.databaseConnection = connection;
this.releaseCallback = release;

const onErrorCallback = () => this.release();
this.releaseCallback = () => {
this.databaseConnection.removeListener("error", onErrorCallback);
release();
};
this.databaseConnection.on("error", onErrorCallback);

return this.databaseConnection;
});
}
Expand All @@ -102,14 +116,14 @@ export class PostgresQueryRunner extends BaseQueryRunner implements QueryRunner
* Releases used database connection.
* You cannot use query runner methods once its released.
*/
release(err?: any): Promise<void> {
release(): Promise<void> {
if (this.isReleased) {
return Promise.resolve();
}

this.isReleased = true;
if (this.releaseCallback)
this.releaseCallback(err);
this.releaseCallback();

const index = this.driver.connectedQueryRunners.indexOf(this);
if (index !== -1) this.driver.connectedQueryRunners.splice(index);
Expand Down Expand Up @@ -168,14 +182,7 @@ export class PostgresQueryRunner extends BaseQueryRunner implements QueryRunner
this.driver.connection.logger.logQuery(query, parameters, this);
const queryStartTime = +new Date();

const onError = (err: any) => {
this.release(err);
};
databaseConnection.once("error", onError);

databaseConnection.query(query, parameters, (err: any, result: any) => {
databaseConnection.removeListener("error", onError);

// log slow queries if maxQueryExecution time is set
const maxQueryExecutionTime = this.driver.connection.options.maxQueryExecutionTime;
const queryEndTime = +new Date();
Expand Down
30 changes: 30 additions & 0 deletions test/github-issues/6699/issue-6699.ts
@@ -0,0 +1,30 @@
import {closeTestingConnections, createTestingConnections, reloadTestingDatabases} from "../../utils/test-utils";
import {Connection} from "../../../src/connection/Connection";
import {expect} from 'chai';

describe("github issues > #6699 MaxListenersExceededWarning occurs on Postgres", () => {

let connections: Connection[];
before(async () => connections = await createTestingConnections({
entities: [],
enabledDrivers: ["postgres"]
}));
beforeEach(() => reloadTestingDatabases(connections));
after(() => closeTestingConnections(connections));

it("queries in a transaction do not cause an EventEmitter memory leak", () => Promise.all(connections.map(async connection => {
await connection.transaction(async manager => {
const queryPromises = [...Array(10)].map(
() => manager.query('SELECT pg_sleep(0.0001)')
);

const pgConnection = await manager.queryRunner!.connect();

expect(pgConnection.listenerCount('error')).to.equal(1);

// Wait for all of the queries to finish and drain the backlog
await Promise.all(queryPromises);
});
})));

});