Skip to content

Commit

Permalink
fix: actually return a working ReadStream from SQL Server query runner (
Browse files Browse the repository at this point in the history
#7893)

the sql server query runner wasn't actually returning a ReadStream as expected
and there were no tests that confirmed the stream functionality.  this corrects
both of those problems
  • Loading branch information
imnotjames committed Jul 11, 2021
1 parent 1f64da2 commit e80985f
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 24 deletions.
62 changes: 38 additions & 24 deletions src/driver/sqlserver/SqlServerQueryRunner.ts
Expand Up @@ -26,6 +26,7 @@ import {SqlServerDriver} from "./SqlServerDriver";
import {ReplicationMode} from "../types/ReplicationMode";
import {BroadcasterResult} from "../../subscriber/BroadcasterResult";
import { TypeORMError } from "../../error";
import { PassThrough } from "stream";

/**
* Runs queries on a single SQL Server database connection.
Expand Down Expand Up @@ -299,6 +300,8 @@ export class SqlServerQueryRunner extends BaseQueryRunner implements QueryRunner
if (this.isReleased)
throw new QueryRunnerAlreadyReleasedError();

let promise: Promise<ReadStream>;

let waitingOkay: Function;
const waitingPromise = new Promise((ok) => waitingOkay = ok);
if (this.queryResponsibilityChain.length) {
Expand All @@ -307,7 +310,18 @@ export class SqlServerQueryRunner extends BaseQueryRunner implements QueryRunner
await Promise.all(otherWaitingPromises);
}

const promise = new Promise<ReadStream>(async (ok, fail) => {
const resolveChain = () => {
let promiseIndex = this.queryResponsibilityChain.indexOf(promise);
let waitingPromiseIndex = this.queryResponsibilityChain.indexOf(waitingPromise);

if (promiseIndex !== -1)
this.queryResponsibilityChain.splice(promiseIndex, 1);
if (waitingPromiseIndex !== -1)
this.queryResponsibilityChain.splice(waitingPromiseIndex, 1);
waitingOkay();
};

promise = new Promise<ReadStream>(async (ok, fail) => {

this.driver.connection.logger.logQuery(query, parameters, this);
const pool = await (this.mode === "slave" ? this.driver.obtainSlaveConnection() : this.driver.obtainMasterConnection());
Expand All @@ -323,30 +337,30 @@ export class SqlServerQueryRunner extends BaseQueryRunner implements QueryRunner
}
});
}
request.query(query, (err: any, result: any) => {

const resolveChain = () => {
if (promiseIndex !== -1)
this.queryResponsibilityChain.splice(promiseIndex, 1);
if (waitingPromiseIndex !== -1)
this.queryResponsibilityChain.splice(waitingPromiseIndex, 1);
waitingOkay();
};

let promiseIndex = this.queryResponsibilityChain.indexOf(promise);
let waitingPromiseIndex = this.queryResponsibilityChain.indexOf(waitingPromise);
if (err) {
this.driver.connection.logger.logQueryError(err, query, parameters, this);
resolveChain();
return fail(err);
}

ok(result.recordset);
resolveChain();
});
if (onEnd) request.on("done", onEnd);
if (onError) request.on("error", onError);
ok(request as ReadStream);
request.query(query);

// Any event should release the lock.
request.once("row", resolveChain);
request.once("rowsaffected", resolveChain);
request.once("done", resolveChain);
request.once("error", resolveChain);

request.on("error", (err: any) => {
this.driver.connection.logger.logQueryError(err, query, parameters, this);
fail(err);
})

if (onEnd) {
request.on("done", onEnd);
}

if (onError) {
request.on("error", onError);
}

// This can be done with request.getReadStream() in node-mssql 7.0.0
ok(request.pipe(new PassThrough({ objectMode: true })));
});
if (this.isTransactionActive)
this.queryResponsibilityChain.push(promise);
Expand Down
40 changes: 40 additions & 0 deletions test/functional/query-runner/stream.ts
@@ -0,0 +1,40 @@
import "reflect-metadata";
import { Connection } from "../../../src";
import { expect } from "chai";
import { closeTestingConnections, createTestingConnections, reloadTestingDatabases } from "../../utils/test-utils";
import { Book } from "./entity/Book";

describe("query runner > stream", () => {

let connections: Connection[];
before(async () => {
connections = await createTestingConnections({
entities: [Book],
enabledDrivers: [ "mysql", "cockroachdb", "postgres", "mssql" ],
});
});
beforeEach(() => reloadTestingDatabases(connections));
after(() => closeTestingConnections(connections));

it("should stream data", () => Promise.all(connections.map(async connection => {
await connection.manager.save(Book, { ean: 'a' });
await connection.manager.save(Book, { ean: 'b' });
await connection.manager.save(Book, { ean: 'c' });
await connection.manager.save(Book, { ean: 'd' });

const queryRunner = connection.createQueryRunner();

const readStream = await queryRunner.stream('SELECT * FROM book');

await new Promise((ok) => readStream.on('readable', ok));

expect(readStream.read()).to.be.eql({ ean: 'a' });
expect(readStream.read()).to.be.eql({ ean: 'b' });
expect(readStream.read()).to.be.eql({ ean: 'c' });
expect(readStream.read()).to.be.eql({ ean: 'd' });
expect(readStream.read()).to.be.null;

await queryRunner.release();
})));

});

0 comments on commit e80985f

Please sign in to comment.