Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix pg-query-stream implementation #2051

Merged
merged 5 commits into from Dec 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/pg-cursor/index.js
Expand Up @@ -182,7 +182,7 @@ Cursor.prototype.end = util.deprecate(function (cb) {
}, 'Cursor.end is deprecated. Call end on the client itself to end a connection to the database.')

Cursor.prototype.close = function (cb) {
if (this.state === 'done') {
if (!this.connection || this.state === 'done') {
if (cb) {
return setImmediate(cb)
} else {
Expand Down
5 changes: 5 additions & 0 deletions packages/pg-cursor/test/close.js
Expand Up @@ -46,4 +46,9 @@ describe('close', function () {
})
})
})

it('is a no-op to "close" the cursor before submitting it', function (done) {
const cursor = new Cursor(text)
cursor.close(done)
})
})
2 changes: 1 addition & 1 deletion packages/pg-query-stream/README.md
Expand Up @@ -47,7 +47,7 @@ I'm very open to contribution! Open a pull request with your code or idea and w

The MIT License (MIT)

Copyright (c) 2013 Brian M. Carlson
Copyright (c) 2013-2019 Brian M. Carlson

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
57 changes: 20 additions & 37 deletions packages/pg-query-stream/index.js
@@ -1,14 +1,12 @@
'use strict'
var Cursor = require('pg-cursor')
var Readable = require('stream').Readable
const { Readable } = require('stream')
const Cursor = require('pg-cursor')

class PgQueryStream extends Readable {
constructor (text, values, options) {
super(Object.assign({ objectMode: true }, options))
this.cursor = new Cursor(text, values, options)
this._reading = false
this._closed = false
this.batchSize = (options || {}).batchSize || 100
constructor(text, values, config = {}) {
const { batchSize = 100 } = config;
// https://nodejs.org/api/stream.html#stream_new_stream_readable_options
super({ objectMode: true, emitClose: true, autoDestroy: true, highWaterMark: batchSize })
this.cursor = new Cursor(text, values, config)

// delegate Submittable callbacks to cursor
this.handleRowDescription = this.cursor.handleRowDescription.bind(this.cursor)
Expand All @@ -19,40 +17,25 @@ class PgQueryStream extends Readable {
this.handleError = this.cursor.handleError.bind(this.cursor)
}

submit (connection) {
submit(connection) {
this.cursor.submit(connection)
}

close (callback) {
this._closed = true
const cb = callback || (() => this.emit('close'))
this.cursor.close(cb)
_destroy(_err, cb) {
this.cursor.close((err) => {
cb(err || _err)
})
}

_read (size) {
if (this._reading || this._closed) {
return false
}
this._reading = true
const readAmount = Math.max(size, this.batchSize)
this.cursor.read(readAmount, (err, rows) => {
if (this._closed) {
return
}
// https://nodejs.org/api/stream.html#stream_readable_read_size_1
_read(size) {
this.cursor.read(size, (err, rows, result) => {
if (err) {
return this.emit('error', err)
}
// if we get a 0 length array we've read to the end of the cursor
if (!rows.length) {
this._closed = true
setImmediate(() => this.emit('close'))
return this.push(null)
}

// push each row into the stream
this._reading = false
for (var i = 0; i < rows.length; i++) {
this.push(rows[i])
// https://nodejs.org/api/stream.html#stream_errors_while_reading
this.destroy(err)
} else {
for (const row of rows) this.push(row)
if (rows.length < size) this.push(null)
}
})
}
Expand Down
55 changes: 55 additions & 0 deletions packages/pg-query-stream/test/async-iterator.es6
Expand Up @@ -54,4 +54,59 @@ describe('Async iterator', () => {
assert.equal(allRows.length, 603)
await pool.end()
})

it('can break out of iteration early', async () => {
const pool = new pg.Pool({ max: 1 })
const client = await pool.connect()
const rows = []
for await (const row of client.query(new QueryStream(queryText, [], { batchSize: 1 }))) {
rows.push(row)
break;
}
for await (const row of client.query(new QueryStream(queryText, []))) {
rows.push(row)
break;
}
for await (const row of client.query(new QueryStream(queryText, []))) {
rows.push(row)
break;
}
assert.strictEqual(rows.length, 3)
client.release()
await pool.end()
})

it('only returns rows on first iteration', async () => {
const pool = new pg.Pool({ max: 1 })
const client = await pool.connect()
const rows = []
const stream = client.query(new QueryStream(queryText, []))
for await (const row of stream) {
rows.push(row)
break;
}
for await (const row of stream) {
rows.push(row)
}
for await (const row of stream) {
rows.push(row)
}
assert.strictEqual(rows.length, 1)
client.release()
await pool.end()
})

it('can read with delays', async () => {
const pool = new pg.Pool({ max: 1 })
const client = await pool.connect()
const rows = []
const stream = client.query(new QueryStream(queryText, [], { batchSize: 1 }))
for await (const row of stream) {
rows.push(row)
await new Promise((resolve) => setTimeout(resolve, 1))
}
assert.strictEqual(rows.length, 201)
client.release()
await pool.end()
})
})
74 changes: 55 additions & 19 deletions packages/pg-query-stream/test/close.js
Expand Up @@ -4,49 +4,85 @@ var concat = require('concat-stream')
var QueryStream = require('../')
var helper = require('./helper')

if (process.version.startsWith('v8.')) {
return console.error('warning! node versions less than 10lts no longer supported & stream closing semantics may not behave properly');
}

helper('close', function (client) {
it('emits close', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [3], {batchSize: 2, highWaterMark: 2})
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [3], { batchSize: 2, highWaterMark: 2 })
var query = client.query(stream)
query.pipe(concat(function () {}))
query.pipe(concat(function () { }))
query.on('close', done)
})
})

helper('early close', function (client) {
it('can be closed early', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [20000], {batchSize: 2, highWaterMark: 2})
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [20000], { batchSize: 2, highWaterMark: 2 })
var query = client.query(stream)
var readCount = 0
query.on('readable', function () {
readCount++
query.read()
})
query.once('readable', function () {
query.close()
query.destroy()
})
query.on('close', function () {
assert(readCount < 10, 'should not have read more than 10 rows')
done()
})
})
})

helper('close callback', function (client) {
it('notifies an optional callback when the conneciton is closed', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [10], {batchSize: 2, highWaterMark: 2})
var query = client.query(stream)
query.once('readable', function () { // only reading once
query.read()
})
query.once('readable', function () {
query.close(function () {
// nothing to assert. This test will time out if the callback does not work.
done()
it('can destroy stream while reading', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, 100), pg_sleep(1)')
client.query(stream)
stream.on('data', () => done(new Error('stream should not have returned rows')))
setTimeout(() => {
stream.destroy()
stream.on('close', done)
}, 100)
})

it('emits an error when calling destroy with an error', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, 100), pg_sleep(1)')
client.query(stream)
stream.on('data', () => done(new Error('stream should not have returned rows')))
setTimeout(() => {
stream.destroy(new Error('intentional error'))
stream.on('error', (err) => {
// make sure there's an error
assert(err);
assert.strictEqual(err.message, 'intentional error');
done();
})
}, 100)
})

it('can destroy stream while reading an error', function (done) {
var stream = new QueryStream('SELECT * from pg_sleep(1), basdfasdf;')
client.query(stream)
stream.on('data', () => done(new Error('stream should not have returned rows')))
stream.once('error', () => {
stream.destroy()
// wait a bit to let any other errors shake through
setTimeout(done, 100)
})
query.on('close', function () {
assert(false, 'close event should not fire') // no close event because we did not read to the end of the stream.
})
})

it('does not crash when destroying the stream immediately after calling read', function (done) {
var stream = new QueryStream('SELECT * from generate_series(0, 100), pg_sleep(1);')
client.query(stream)
stream.on('data', () => done(new Error('stream should not have returned rows')))
stream.destroy()
stream.on('close', done)
})

it('does not crash when destroying the stream before its submitted', function (done) {
var stream = new QueryStream('SELECT * from generate_series(0, 100), pg_sleep(1);')
stream.on('data', () => done(new Error('stream should not have returned rows')))
stream.destroy()
stream.on('close', done)
})
})
4 changes: 1 addition & 3 deletions packages/pg-query-stream/test/config.js
Expand Up @@ -2,9 +2,7 @@ var assert = require('assert')
var QueryStream = require('../')

var stream = new QueryStream('SELECT NOW()', [], {
highWaterMark: 999,
batchSize: 88
})

assert.equal(stream._readableState.highWaterMark, 999)
assert.equal(stream.batchSize, 88)
assert.equal(stream._readableState.highWaterMark, 88)