diff --git a/packages/pg-query-stream/src/index.ts b/packages/pg-query-stream/src/index.ts index 033c08af2..c942b0441 100644 --- a/packages/pg-query-stream/src/index.ts +++ b/packages/pg-query-stream/src/index.ts @@ -2,15 +2,17 @@ import { Readable } from 'stream' import { Submittable, Connection } from 'pg' import Cursor from 'pg-cursor' -interface PgQueryStreamConfig { +interface QueryStreamConfig { batchSize?: number highWaterMark?: number rowMode?: 'array' types?: any } -class PgQueryStream extends Readable implements Submittable { +class QueryStream extends Readable implements Submittable { cursor: any + _result: any + handleRowDescription: Function handleDataRow: Function handlePortalSuspended: Function @@ -19,9 +21,7 @@ class PgQueryStream extends Readable implements Submittable { handleError: Function handleEmptyQuery: Function - _result: any - - constructor(text: string, values?: any[], config: PgQueryStreamConfig = {}) { + public constructor(text: string, values?: any[], config: QueryStreamConfig = {}) { const { batchSize, highWaterMark = 100 } = config super({ objectMode: true, autoDestroy: true, highWaterMark: batchSize || highWaterMark }) @@ -40,20 +40,21 @@ class PgQueryStream extends Readable implements Submittable { this._result = this.cursor._result } - submit(connection: Connection): void { + public submit(connection: Connection): void { this.cursor.submit(connection) } - _destroy(_err: Error, cb: Function) { + public _destroy(_err: Error, cb: Function) { this.cursor.close((err?: Error) => { cb(err || _err) }) } // https://nodejs.org/api/stream.html#stream_readable_read_size_1 - _read(size: number) { - this.cursor.read(size, (err: Error, rows: any[], result: any) => { + public _read(size: number) { + this.cursor.read(size, (err: Error, rows: any[]) => { if (err) { + // https://nodejs.org/api/stream.html#stream_errors_while_reading this.destroy(err) } else { for (const row of rows) this.push(row) @@ -63,4 +64,4 @@ class PgQueryStream extends Readable implements Submittable { } } -export = PgQueryStream +export = QueryStream