Skip to content

Commit

Permalink
feat: support QueryRunner.stream with Oracle (#8086)
Browse files Browse the repository at this point in the history
  • Loading branch information
imnotjames committed Aug 17, 2021
1 parent 02f0bce commit b858f84
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 10 deletions.
31 changes: 29 additions & 2 deletions src/driver/oracle/OracleQueryRunner.ts
Expand Up @@ -233,8 +233,35 @@ export class OracleQueryRunner extends BaseQueryRunner implements QueryRunner {
/**
* Returns raw data stream.
*/
stream(query: string, parameters?: any[], onEnd?: Function, onError?: Function): Promise<ReadStream> {
throw new TypeORMError(`Stream is not supported by Oracle driver.`);
async stream(query: string, parameters?: any[], onEnd?: Function, onError?: Function): Promise<ReadStream> {
if (this.isReleased) {
throw new QueryRunnerAlreadyReleasedError();
}

const executionOptions = {
autoCommit: !this.isTransactionActive,
outFormat: this.driver.oracle.OBJECT,
}

const databaseConnection = await this.connect();

this.driver.connection.logger.logQuery(query, parameters, this);

try {
const stream = databaseConnection.queryStream(query, parameters, executionOptions);
if (onEnd) {
stream.on("end", onEnd);
}

if (onError) {
stream.on("error", onError);
}

return stream;
} catch (err) {
this.driver.connection.logger.logQueryError(err, query, parameters, this);
throw new QueryFailedError(query, parameters, err);
}
}

/**
Expand Down
27 changes: 19 additions & 8 deletions test/functional/query-runner/stream.ts
Expand Up @@ -10,7 +10,7 @@ describe("query runner > stream", () => {
before(async () => {
connections = await createTestingConnections({
entities: [Book],
enabledDrivers: [ "mysql", "cockroachdb", "postgres", "mssql" ],
enabledDrivers: [ "mysql", "cockroachdb", "postgres", "mssql", "oracle" ],
});
});
beforeEach(() => reloadTestingDatabases(connections));
Expand All @@ -24,15 +24,26 @@ describe("query runner > stream", () => {

const queryRunner = connection.createQueryRunner();

const readStream = await queryRunner.stream('SELECT * FROM book');
const query = connection.createQueryBuilder(Book, 'book')
.select()
.getQuery()

await new Promise((ok) => readStream.on('readable', ok));
const readStream = await queryRunner.stream(query);

expect(readStream.read()).to.be.eql({ ean: 'a' });
expect(readStream.read()).to.be.eql({ ean: 'b' });
expect(readStream.read()).to.be.eql({ ean: 'c' });
expect(readStream.read()).to.be.eql({ ean: 'd' });
expect(readStream.read()).to.be.null;
await new Promise((ok) => readStream.once('readable', ok));

const data: any[] = [];

readStream.on('data', (row) => data.push(row));

await new Promise((ok) => readStream.once('end', ok));

expect(data).to.have.length(4);

expect(data[0]).to.be.eql({ book_ean: 'a' });
expect(data[1]).to.be.eql({ book_ean: 'b' });
expect(data[2]).to.be.eql({ book_ean: 'c' });
expect(data[3]).to.be.eql({ book_ean: 'd' });

await queryRunner.release();
})));
Expand Down

0 comments on commit b858f84

Please sign in to comment.