From e3cb40c3612089f11210921ce457e296c56a83e0 Mon Sep 17 00:00:00 2001 From: "Brian M. Carlson" Date: Sat, 28 Dec 2019 17:08:41 +0000 Subject: [PATCH 1/5] Fix pg-query-stream There were some subtle behaviors with the stream being implemented incorrectly & not working as expected with async iteration. I've modified the code based on #2050 and comments in #2035 to have better test coverage of async iterables and update the internals significantly to more closely match the readable stream interface. Note: this is a __breaking__ (semver major) change to this package as the close event behavior is changed slightly, and `highWaterMark` is no longer supported. It shouldn't impact most usage, but breaking regardless. --- packages/pg-query-stream/index.js | 79 +++++++++++-------- .../pg-query-stream/test/async-iterator.es6 | 55 +++++++++++++ packages/pg-query-stream/test/close.js | 11 +-- packages/pg-query-stream/test/config.js | 4 +- 4 files changed, 106 insertions(+), 43 deletions(-) diff --git a/packages/pg-query-stream/index.js b/packages/pg-query-stream/index.js index 9c34207ec..23b761e36 100644 --- a/packages/pg-query-stream/index.js +++ b/packages/pg-query-stream/index.js @@ -1,14 +1,16 @@ -'use strict' -var Cursor = require('pg-cursor') -var Readable = require('stream').Readable +const { Readable } = require('stream') +const Cursor = require('pg-cursor') class PgQueryStream extends Readable { - constructor (text, values, options) { - super(Object.assign({ objectMode: true }, options)) - this.cursor = new Cursor(text, values, options) + constructor(text, values, config = {}) { + const { batchSize = 100 } = config; + // https://nodejs.org/api/stream.html#stream_new_stream_readable_options + super({ objectMode: true, emitClose: true, autoDestroy: true, highWaterMark: batchSize }) + this.cursor = new Cursor(text, values, config) + this._reading = false - this._closed = false - this.batchSize = (options || {}).batchSize || 100 + this._callbacks = [] + this._err = undefined; // delegate Submittable callbacks to cursor this.handleRowDescription = this.cursor.handleRowDescription.bind(this.cursor) @@ -19,40 +21,51 @@ class PgQueryStream extends Readable { this.handleError = this.cursor.handleError.bind(this.cursor) } - submit (connection) { + submit(connection) { this.cursor.submit(connection) } - close (callback) { - this._closed = true - const cb = callback || (() => this.emit('close')) - this.cursor.close(cb) + close(callback) { + if (this.destroyed) { + if (callback) setImmediate(callback) + } else { + if (callback) this.once('close', callback) + this.destroy() + } + } + + _close() { + this.cursor.close((err) => { + let cb + while ((cb = this._callbacks.pop())) cb(err || this._err) + }) } - _read (size) { - if (this._reading || this._closed) { - return false + _destroy(_err, callback) { + this._err = _err; + this._callbacks.push(callback) + if (!this._reading) { + this._close() } + } + + // https://nodejs.org/api/stream.html#stream_readable_read_size_1 + _read(size) { + // Prevent _destroy() from closing while reading this._reading = true - const readAmount = Math.max(size, this.batchSize) - this.cursor.read(readAmount, (err, rows) => { - if (this._closed) { - return - } - if (err) { - return this.emit('error', err) - } - // if we get a 0 length array we've read to the end of the cursor - if (!rows.length) { - this._closed = true - setImmediate(() => this.emit('close')) - return this.push(null) - } - // push each row into the stream + this.cursor.read(size, (err, rows, result) => { this._reading = false - for (var i = 0; i < rows.length; i++) { - this.push(rows[i]) + + if (this.destroyed) { + // Destroyed while reading? + this._close() + } else if (err) { + // https://nodejs.org/api/stream.html#stream_errors_while_reading + this.destroy(err) + } else { + for (const row of rows) this.push(row) + if (rows.length < size) this.push(null) } }) } diff --git a/packages/pg-query-stream/test/async-iterator.es6 b/packages/pg-query-stream/test/async-iterator.es6 index e84089b6c..35d94aa73 100644 --- a/packages/pg-query-stream/test/async-iterator.es6 +++ b/packages/pg-query-stream/test/async-iterator.es6 @@ -54,4 +54,59 @@ describe('Async iterator', () => { assert.equal(allRows.length, 603) await pool.end() }) + + it('can break out of iteration early', async () => { + const pool = new pg.Pool({ max: 1 }) + const client = await pool.connect() + const rows = [] + for await (const row of client.query(new QueryStream(queryText, []))) { + rows.push(row) + break; + } + for await (const row of client.query(new QueryStream(queryText, []))) { + rows.push(row) + break; + } + for await (const row of client.query(new QueryStream(queryText, []))) { + rows.push(row) + break; + } + assert.strictEqual(rows.length, 3) + client.release() + await pool.end() + }) + + it('only returns rows on first iteration', async () => { + const pool = new pg.Pool({ max: 1 }) + const client = await pool.connect() + const rows = [] + const stream = client.query(new QueryStream(queryText, [])) + for await (const row of stream) { + rows.push(row) + break; + } + for await (const row of stream) { + rows.push(row) + } + for await (const row of stream) { + rows.push(row) + } + assert.strictEqual(rows.length, 1) + client.release() + await pool.end() + }) + + it('can read with delays', async () => { + const pool = new pg.Pool({ max: 1 }) + const client = await pool.connect() + const rows = [] + const stream = client.query(new QueryStream(queryText, [], { batchSize: 1 })) + for await (const row of stream) { + rows.push(row) + await new Promise((resolve) => setTimeout(resolve, 1)) + } + assert.strictEqual(rows.length, 201) + client.release() + await pool.end() + }) }) diff --git a/packages/pg-query-stream/test/close.js b/packages/pg-query-stream/test/close.js index be103c7f6..3e6ede948 100644 --- a/packages/pg-query-stream/test/close.js +++ b/packages/pg-query-stream/test/close.js @@ -6,16 +6,16 @@ var helper = require('./helper') helper('close', function (client) { it('emits close', function (done) { - var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [3], {batchSize: 2, highWaterMark: 2}) + var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [3], { batchSize: 2, highWaterMark: 2 }) var query = client.query(stream) - query.pipe(concat(function () {})) + query.pipe(concat(function () { })) query.on('close', done) }) }) helper('early close', function (client) { it('can be closed early', function (done) { - var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [20000], {batchSize: 2, highWaterMark: 2}) + var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [20000], { batchSize: 2, highWaterMark: 2 }) var query = client.query(stream) var readCount = 0 query.on('readable', function () { @@ -34,7 +34,7 @@ helper('early close', function (client) { helper('close callback', function (client) { it('notifies an optional callback when the conneciton is closed', function (done) { - var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [10], {batchSize: 2, highWaterMark: 2}) + var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [10], { batchSize: 2, highWaterMark: 2 }) var query = client.query(stream) query.once('readable', function () { // only reading once query.read() @@ -45,8 +45,5 @@ helper('close callback', function (client) { done() }) }) - query.on('close', function () { - assert(false, 'close event should not fire') // no close event because we did not read to the end of the stream. - }) }) }) diff --git a/packages/pg-query-stream/test/config.js b/packages/pg-query-stream/test/config.js index 4ed5b1b93..78251c894 100644 --- a/packages/pg-query-stream/test/config.js +++ b/packages/pg-query-stream/test/config.js @@ -2,9 +2,7 @@ var assert = require('assert') var QueryStream = require('../') var stream = new QueryStream('SELECT NOW()', [], { - highWaterMark: 999, batchSize: 88 }) -assert.equal(stream._readableState.highWaterMark, 999) -assert.equal(stream.batchSize, 88) +assert.equal(stream._readableState.highWaterMark, 88) From daa7523be83bcb431980f8011197a60857cf40ac Mon Sep 17 00:00:00 2001 From: "Brian M. Carlson" Date: Sun, 29 Dec 2019 17:25:30 +0000 Subject: [PATCH 2/5] Remove a bunch of additional code --- packages/pg-query-stream/README.md | 2 +- packages/pg-query-stream/index.js | 36 ++---------------- .../pg-query-stream/test/async-iterator.es6 | 2 +- packages/pg-query-stream/test/close.js | 37 ++++++++++++------- 4 files changed, 28 insertions(+), 49 deletions(-) diff --git a/packages/pg-query-stream/README.md b/packages/pg-query-stream/README.md index d00550aec..312387aa7 100644 --- a/packages/pg-query-stream/README.md +++ b/packages/pg-query-stream/README.md @@ -47,7 +47,7 @@ I'm very open to contribution! Open a pull request with your code or idea and w The MIT License (MIT) -Copyright (c) 2013 Brian M. Carlson +Copyright (c) 2013-2019 Brian M. Carlson Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/packages/pg-query-stream/index.js b/packages/pg-query-stream/index.js index 23b761e36..1258cc204 100644 --- a/packages/pg-query-stream/index.js +++ b/packages/pg-query-stream/index.js @@ -8,10 +8,6 @@ class PgQueryStream extends Readable { super({ objectMode: true, emitClose: true, autoDestroy: true, highWaterMark: batchSize }) this.cursor = new Cursor(text, values, config) - this._reading = false - this._callbacks = [] - this._err = undefined; - // delegate Submittable callbacks to cursor this.handleRowDescription = this.cursor.handleRowDescription.bind(this.cursor) this.handleDataRow = this.cursor.handleDataRow.bind(this.cursor) @@ -25,42 +21,16 @@ class PgQueryStream extends Readable { this.cursor.submit(connection) } - close(callback) { - if (this.destroyed) { - if (callback) setImmediate(callback) - } else { - if (callback) this.once('close', callback) - this.destroy() - } - } - - _close() { + _destroy(_err, cb) { this.cursor.close((err) => { - let cb - while ((cb = this._callbacks.pop())) cb(err || this._err) + cb && cb(err || _err) }) } - _destroy(_err, callback) { - this._err = _err; - this._callbacks.push(callback) - if (!this._reading) { - this._close() - } - } - // https://nodejs.org/api/stream.html#stream_readable_read_size_1 _read(size) { - // Prevent _destroy() from closing while reading - this._reading = true - this.cursor.read(size, (err, rows, result) => { - this._reading = false - - if (this.destroyed) { - // Destroyed while reading? - this._close() - } else if (err) { + if (err) { // https://nodejs.org/api/stream.html#stream_errors_while_reading this.destroy(err) } else { diff --git a/packages/pg-query-stream/test/async-iterator.es6 b/packages/pg-query-stream/test/async-iterator.es6 index 35d94aa73..47bda86d2 100644 --- a/packages/pg-query-stream/test/async-iterator.es6 +++ b/packages/pg-query-stream/test/async-iterator.es6 @@ -59,7 +59,7 @@ describe('Async iterator', () => { const pool = new pg.Pool({ max: 1 }) const client = await pool.connect() const rows = [] - for await (const row of client.query(new QueryStream(queryText, []))) { + for await (const row of client.query(new QueryStream(queryText, [], { batchSize: 1 }))) { rows.push(row) break; } diff --git a/packages/pg-query-stream/test/close.js b/packages/pg-query-stream/test/close.js index 3e6ede948..b08b77277 100644 --- a/packages/pg-query-stream/test/close.js +++ b/packages/pg-query-stream/test/close.js @@ -4,6 +4,10 @@ var concat = require('concat-stream') var QueryStream = require('../') var helper = require('./helper') +if (process.version.startsWith('v8.')) { + return console.error('warning! node versions less than 10lts no longer supported & stream closing semantics may not behave properly'); +} + helper('close', function (client) { it('emits close', function (done) { var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [3], { batchSize: 2, highWaterMark: 2 }) @@ -23,27 +27,32 @@ helper('early close', function (client) { query.read() }) query.once('readable', function () { - query.close() + query.destroy() }) query.on('close', function () { assert(readCount < 10, 'should not have read more than 10 rows') done() }) }) -}) -helper('close callback', function (client) { - it('notifies an optional callback when the conneciton is closed', function (done) { - var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [10], { batchSize: 2, highWaterMark: 2 }) - var query = client.query(stream) - query.once('readable', function () { // only reading once - query.read() - }) - query.once('readable', function () { - query.close(function () { - // nothing to assert. This test will time out if the callback does not work. - done() - }) + it('can destroy stream while reading', function (done) { + var stream = new QueryStream('SELECT * FROM generate_series(0, 100), pg_sleep(1)') + client.query(stream) + stream.on('data', () => done(new Error('stream should not have returned rows'))) + setTimeout(() => { + stream.destroy() + stream.on('close', done) + }, 100) + }) + + it('can destroy stream while reading an error', function (done) { + var stream = new QueryStream('SELECT * from pg_sleep(1), basdfasdf;') + client.query(stream) + stream.on('data', () => done(new Error('stream should not have returned rows'))) + stream.once('error', () => { + stream.destroy() + // wait a bit to let any other errors shake through + setTimeout(done, 100) }) }) }) From 7875c48efbda6083f50db3d86b884e57bf04ad9e Mon Sep 17 00:00:00 2001 From: "Brian M. Carlson" Date: Sun, 29 Dec 2019 18:31:18 +0000 Subject: [PATCH 3/5] Add test for destroy + error propagation --- packages/pg-query-stream/index.js | 2 +- packages/pg-query-stream/test/close.js | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/packages/pg-query-stream/index.js b/packages/pg-query-stream/index.js index 1258cc204..4576f5fb5 100644 --- a/packages/pg-query-stream/index.js +++ b/packages/pg-query-stream/index.js @@ -23,7 +23,7 @@ class PgQueryStream extends Readable { _destroy(_err, cb) { this.cursor.close((err) => { - cb && cb(err || _err) + cb(err || _err) }) } diff --git a/packages/pg-query-stream/test/close.js b/packages/pg-query-stream/test/close.js index b08b77277..d1a4ee566 100644 --- a/packages/pg-query-stream/test/close.js +++ b/packages/pg-query-stream/test/close.js @@ -45,6 +45,21 @@ helper('early close', function (client) { }, 100) }) + it('emits an error when calling destroy with an error', function (done) { + var stream = new QueryStream('SELECT * FROM generate_series(0, 100), pg_sleep(1)') + client.query(stream) + stream.on('data', () => done(new Error('stream should not have returned rows'))) + setTimeout(() => { + stream.destroy(new Error('intentional error')) + stream.on('error', (err) => { + // make sure there's an error + assert(err); + assert.strictEqual(err.message, 'intentional error'); + done(); + }) + }, 100) + }) + it('can destroy stream while reading an error', function (done) { var stream = new QueryStream('SELECT * from pg_sleep(1), basdfasdf;') client.query(stream) From 5eaad2f76ef91ee49eeda4bf4418f2536b9b650a Mon Sep 17 00:00:00 2001 From: "Brian M. Carlson" Date: Sun, 29 Dec 2019 18:42:02 +0000 Subject: [PATCH 4/5] Add failing test for destroying unsubmitted stream --- packages/pg-query-stream/test/close.js | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/packages/pg-query-stream/test/close.js b/packages/pg-query-stream/test/close.js index d1a4ee566..d7e44b675 100644 --- a/packages/pg-query-stream/test/close.js +++ b/packages/pg-query-stream/test/close.js @@ -61,7 +61,7 @@ helper('early close', function (client) { }) it('can destroy stream while reading an error', function (done) { - var stream = new QueryStream('SELECT * from pg_sleep(1), basdfasdf;') + var stream = new QueryStream('SELECT * from pg_sleep(1), basdfasdf;') client.query(stream) stream.on('data', () => done(new Error('stream should not have returned rows'))) stream.once('error', () => { @@ -70,4 +70,19 @@ helper('early close', function (client) { setTimeout(done, 100) }) }) + + it('does not crash when destroying the stream immediately after calling read', function (done) { + var stream = new QueryStream('SELECT * from generate_series(0, 100), pg_sleep(1);') + client.query(stream) + stream.on('data', () => done(new Error('stream should not have returned rows'))) + stream.destroy() + stream.on('close', done) + }) + + it('does not crash when destroying the stream before its submitted', function (done) { + var stream = new QueryStream('SELECT * from generate_series(0, 100), pg_sleep(1);') + stream.on('data', () => done(new Error('stream should not have returned rows'))) + stream.destroy() + stream.on('close', done) + }) }) From 91d0a72ace7f8dc9d77f8b2a15b541a86f2857e1 Mon Sep 17 00:00:00 2001 From: "Brian M. Carlson" Date: Sun, 29 Dec 2019 15:54:38 -0800 Subject: [PATCH 5/5] Do not throw an uncatchable error when closing an unused cursor --- packages/pg-cursor/index.js | 2 +- packages/pg-cursor/test/close.js | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/packages/pg-cursor/index.js b/packages/pg-cursor/index.js index 624877680..727fe9081 100644 --- a/packages/pg-cursor/index.js +++ b/packages/pg-cursor/index.js @@ -182,7 +182,7 @@ Cursor.prototype.end = util.deprecate(function (cb) { }, 'Cursor.end is deprecated. Call end on the client itself to end a connection to the database.') Cursor.prototype.close = function (cb) { - if (this.state === 'done') { + if (!this.connection || this.state === 'done') { if (cb) { return setImmediate(cb) } else { diff --git a/packages/pg-cursor/test/close.js b/packages/pg-cursor/test/close.js index 785a71098..e63512abd 100644 --- a/packages/pg-cursor/test/close.js +++ b/packages/pg-cursor/test/close.js @@ -46,4 +46,9 @@ describe('close', function () { }) }) }) + + it('is a no-op to "close" the cursor before submitting it', function (done) { + const cursor = new Cursor(text) + cursor.close(done) + }) })