Skip to content

Commit

Permalink
Fix pg-query-stream implementation (#2051)
Browse files Browse the repository at this point in the history
* Fix pg-query-stream

There were some subtle behaviors with the stream being implemented incorrectly & not working as expected with async iteration.  I've modified the code based on #2050 and comments in #2035 to have better test coverage of async iterables and update the internals significantly to more closely match the readable stream interface.

Note: this is a __breaking__ (semver major) change to this package as the close event behavior is changed slightly, and `highWaterMark` is no longer supported.  It shouldn't impact most usage, but breaking regardless.

* Remove a bunch of additional code

* Add test for destroy + error propagation

* Add failing test for destroying unsubmitted stream

* Do not throw an uncatchable error when closing an unused cursor
  • Loading branch information
brianc committed Dec 30, 2019
1 parent 6d93951 commit 8eca181
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 61 deletions.
2 changes: 1 addition & 1 deletion packages/pg-cursor/index.js
Expand Up @@ -182,7 +182,7 @@ Cursor.prototype.end = util.deprecate(function (cb) {
}, 'Cursor.end is deprecated. Call end on the client itself to end a connection to the database.')

Cursor.prototype.close = function (cb) {
if (this.state === 'done') {
if (!this.connection || this.state === 'done') {
if (cb) {
return setImmediate(cb)
} else {
Expand Down
5 changes: 5 additions & 0 deletions packages/pg-cursor/test/close.js
Expand Up @@ -46,4 +46,9 @@ describe('close', function () {
})
})
})

it('is a no-op to "close" the cursor before submitting it', function (done) {
const cursor = new Cursor(text)
cursor.close(done)
})
})
2 changes: 1 addition & 1 deletion packages/pg-query-stream/README.md
Expand Up @@ -47,7 +47,7 @@ I'm very open to contribution! Open a pull request with your code or idea and w

The MIT License (MIT)

Copyright (c) 2013 Brian M. Carlson
Copyright (c) 2013-2019 Brian M. Carlson

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
57 changes: 20 additions & 37 deletions packages/pg-query-stream/index.js
@@ -1,14 +1,12 @@
'use strict'
var Cursor = require('pg-cursor')
var Readable = require('stream').Readable
const { Readable } = require('stream')
const Cursor = require('pg-cursor')

class PgQueryStream extends Readable {
constructor (text, values, options) {
super(Object.assign({ objectMode: true }, options))
this.cursor = new Cursor(text, values, options)
this._reading = false
this._closed = false
this.batchSize = (options || {}).batchSize || 100
constructor(text, values, config = {}) {
const { batchSize = 100 } = config;
// https://nodejs.org/api/stream.html#stream_new_stream_readable_options
super({ objectMode: true, emitClose: true, autoDestroy: true, highWaterMark: batchSize })
this.cursor = new Cursor(text, values, config)

// delegate Submittable callbacks to cursor
this.handleRowDescription = this.cursor.handleRowDescription.bind(this.cursor)
Expand All @@ -19,40 +17,25 @@ class PgQueryStream extends Readable {
this.handleError = this.cursor.handleError.bind(this.cursor)
}

submit (connection) {
submit(connection) {
this.cursor.submit(connection)
}

close (callback) {
this._closed = true
const cb = callback || (() => this.emit('close'))
this.cursor.close(cb)
_destroy(_err, cb) {
this.cursor.close((err) => {
cb(err || _err)
})
}

_read (size) {
if (this._reading || this._closed) {
return false
}
this._reading = true
const readAmount = Math.max(size, this.batchSize)
this.cursor.read(readAmount, (err, rows) => {
if (this._closed) {
return
}
// https://nodejs.org/api/stream.html#stream_readable_read_size_1
_read(size) {
this.cursor.read(size, (err, rows, result) => {
if (err) {
return this.emit('error', err)
}
// if we get a 0 length array we've read to the end of the cursor
if (!rows.length) {
this._closed = true
setImmediate(() => this.emit('close'))
return this.push(null)
}

// push each row into the stream
this._reading = false
for (var i = 0; i < rows.length; i++) {
this.push(rows[i])
// https://nodejs.org/api/stream.html#stream_errors_while_reading
this.destroy(err)
} else {
for (const row of rows) this.push(row)
if (rows.length < size) this.push(null)
}
})
}
Expand Down
55 changes: 55 additions & 0 deletions packages/pg-query-stream/test/async-iterator.es6
Expand Up @@ -54,4 +54,59 @@ describe('Async iterator', () => {
assert.equal(allRows.length, 603)
await pool.end()
})

it('can break out of iteration early', async () => {
const pool = new pg.Pool({ max: 1 })
const client = await pool.connect()
const rows = []
for await (const row of client.query(new QueryStream(queryText, [], { batchSize: 1 }))) {
rows.push(row)
break;
}
for await (const row of client.query(new QueryStream(queryText, []))) {
rows.push(row)
break;
}
for await (const row of client.query(new QueryStream(queryText, []))) {
rows.push(row)
break;
}
assert.strictEqual(rows.length, 3)
client.release()
await pool.end()
})

it('only returns rows on first iteration', async () => {
const pool = new pg.Pool({ max: 1 })
const client = await pool.connect()
const rows = []
const stream = client.query(new QueryStream(queryText, []))
for await (const row of stream) {
rows.push(row)
break;
}
for await (const row of stream) {
rows.push(row)
}
for await (const row of stream) {
rows.push(row)
}
assert.strictEqual(rows.length, 1)
client.release()
await pool.end()
})

it('can read with delays', async () => {
const pool = new pg.Pool({ max: 1 })
const client = await pool.connect()
const rows = []
const stream = client.query(new QueryStream(queryText, [], { batchSize: 1 }))
for await (const row of stream) {
rows.push(row)
await new Promise((resolve) => setTimeout(resolve, 1))
}
assert.strictEqual(rows.length, 201)
client.release()
await pool.end()
})
})
74 changes: 55 additions & 19 deletions packages/pg-query-stream/test/close.js
Expand Up @@ -4,49 +4,85 @@ var concat = require('concat-stream')
var QueryStream = require('../')
var helper = require('./helper')

if (process.version.startsWith('v8.')) {
return console.error('warning! node versions less than 10lts no longer supported & stream closing semantics may not behave properly');
}

helper('close', function (client) {
it('emits close', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [3], {batchSize: 2, highWaterMark: 2})
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [3], { batchSize: 2, highWaterMark: 2 })
var query = client.query(stream)
query.pipe(concat(function () {}))
query.pipe(concat(function () { }))
query.on('close', done)
})
})

helper('early close', function (client) {
it('can be closed early', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [20000], {batchSize: 2, highWaterMark: 2})
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [20000], { batchSize: 2, highWaterMark: 2 })
var query = client.query(stream)
var readCount = 0
query.on('readable', function () {
readCount++
query.read()
})
query.once('readable', function () {
query.close()
query.destroy()
})
query.on('close', function () {
assert(readCount < 10, 'should not have read more than 10 rows')
done()
})
})
})

helper('close callback', function (client) {
it('notifies an optional callback when the conneciton is closed', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [10], {batchSize: 2, highWaterMark: 2})
var query = client.query(stream)
query.once('readable', function () { // only reading once
query.read()
})
query.once('readable', function () {
query.close(function () {
// nothing to assert. This test will time out if the callback does not work.
done()
it('can destroy stream while reading', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, 100), pg_sleep(1)')
client.query(stream)
stream.on('data', () => done(new Error('stream should not have returned rows')))
setTimeout(() => {
stream.destroy()
stream.on('close', done)
}, 100)
})

it('emits an error when calling destroy with an error', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, 100), pg_sleep(1)')
client.query(stream)
stream.on('data', () => done(new Error('stream should not have returned rows')))
setTimeout(() => {
stream.destroy(new Error('intentional error'))
stream.on('error', (err) => {
// make sure there's an error
assert(err);
assert.strictEqual(err.message, 'intentional error');
done();
})
}, 100)
})

it('can destroy stream while reading an error', function (done) {
var stream = new QueryStream('SELECT * from pg_sleep(1), basdfasdf;')
client.query(stream)
stream.on('data', () => done(new Error('stream should not have returned rows')))
stream.once('error', () => {
stream.destroy()
// wait a bit to let any other errors shake through
setTimeout(done, 100)
})
query.on('close', function () {
assert(false, 'close event should not fire') // no close event because we did not read to the end of the stream.
})
})

it('does not crash when destroying the stream immediately after calling read', function (done) {
var stream = new QueryStream('SELECT * from generate_series(0, 100), pg_sleep(1);')
client.query(stream)
stream.on('data', () => done(new Error('stream should not have returned rows')))
stream.destroy()
stream.on('close', done)
})

it('does not crash when destroying the stream before its submitted', function (done) {
var stream = new QueryStream('SELECT * from generate_series(0, 100), pg_sleep(1);')
stream.on('data', () => done(new Error('stream should not have returned rows')))
stream.destroy()
stream.on('close', done)
})
})
4 changes: 1 addition & 3 deletions packages/pg-query-stream/test/config.js
Expand Up @@ -2,9 +2,7 @@ var assert = require('assert')
var QueryStream = require('../')

var stream = new QueryStream('SELECT NOW()', [], {
highWaterMark: 999,
batchSize: 88
})

assert.equal(stream._readableState.highWaterMark, 999)
assert.equal(stream.batchSize, 88)
assert.equal(stream._readableState.highWaterMark, 88)

0 comments on commit 8eca181

Please sign in to comment.