diff --git a/packages/pg-query-stream/src/index.ts b/packages/pg-query-stream/src/index.ts index c942b0441..f3251b919 100644 --- a/packages/pg-query-stream/src/index.ts +++ b/packages/pg-query-stream/src/index.ts @@ -13,6 +13,7 @@ class QueryStream extends Readable implements Submittable { cursor: any _result: any + callback: Function handleRowDescription: Function handleDataRow: Function handlePortalSuspended: Function @@ -26,6 +27,11 @@ class QueryStream extends Readable implements Submittable { super({ objectMode: true, autoDestroy: true, highWaterMark: batchSize || highWaterMark }) this.cursor = new Cursor(text, values, config) + this.cursor.on('end', (result) => { + this.callback && this.callback(null, result) + }).on('error', (err) => { + this.callback && this.callback(err) + }) // delegate Submittable callbacks to cursor this.handleRowDescription = this.cursor.handleRowDescription.bind(this.cursor) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index c6aa3dabe..79a1d3d01 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -511,8 +511,12 @@ class Client extends EventEmitter { } else if (typeof config.submit === 'function') { readTimeout = config.query_timeout || this.connectionParameters.query_timeout result = query = config - if (typeof values === 'function') { - query.callback = query.callback || values + if (!query.callback) { + if (typeof values === 'function') { + query.callback = values + } else if (callback) { + query.callback = callback + } } } else { readTimeout = this.connectionParameters.query_timeout