Skip to content

Commit

Permalink
Merge branch 'brianc:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
zlotnika committed Oct 21, 2021
2 parents b31db25 + 947ccee commit 4fab71e
Show file tree
Hide file tree
Showing 17 changed files with 215 additions and 50 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Expand Up @@ -4,6 +4,15 @@ For richer information consult the commit log on github with referenced pull req

We do not include break-fix version release in this file.

### pg@8.7.0

- Add optional config to [pool](https://github.com/brianc/node-postgres/pull/2568) to allow process to exit if pool is idle.

### pg-cursor@2.7.0

- Convert to [es6 class](https://github.com/brianc/node-postgres/pull/2553)
- Add support for promises [to cursor methods](https://github.com/brianc/node-postgres/pull/2554)

### pg@8.6.0

- Better [SASL](https://github.com/brianc/node-postgres/pull/2436) error messages & more validation on bad configuration.
Expand Down
55 changes: 35 additions & 20 deletions packages/pg-cursor/index.js
Expand Up @@ -17,6 +17,7 @@ class Cursor extends EventEmitter {
this._queue = []
this.state = 'initialized'
this._result = new Result(this._conf.rowMode, this._conf.types)
this._Promise = this._conf.Promise || global.Promise
this._cb = null
this._rows = null
this._portal = null
Expand Down Expand Up @@ -198,38 +199,52 @@ class Cursor extends EventEmitter {
}

close(cb) {
let promise

if (!cb) {
promise = new this._Promise((resolve, reject) => {
cb = (err) => (err ? reject(err) : resolve())
})
}

if (!this.connection || this.state === 'done') {
if (cb) {
return setImmediate(cb)
} else {
return
}
setImmediate(cb)
return promise
}

this._closePortal()
this.state = 'done'
if (cb) {
this.connection.once('readyForQuery', function () {
cb()
})
}
this.connection.once('readyForQuery', function () {
cb()
})

// Return the promise (or undefined)
return promise
}

read(rows, cb) {
if (this.state === 'idle' || this.state === 'submitted') {
return this._getRows(rows, cb)
}
if (this.state === 'busy' || this.state === 'initialized') {
return this._queue.push([rows, cb])
}
if (this.state === 'error') {
return setImmediate(() => cb(this._error))
let promise

if (!cb) {
promise = new this._Promise((resolve, reject) => {
cb = (err, rows) => (err ? reject(err) : resolve(rows))
})
}
if (this.state === 'done') {
return setImmediate(() => cb(null, []))

if (this.state === 'idle' || this.state === 'submitted') {
this._getRows(rows, cb)
} else if (this.state === 'busy' || this.state === 'initialized') {
this._queue.push([rows, cb])
} else if (this.state === 'error') {
setImmediate(() => cb(this._error))
} else if (this.state === 'done') {
setImmediate(() => cb(null, []))
} else {
throw new Error('Unknown state: ' + this.state)
}

// Return the promise (or undefined)
return promise
}
}

Expand Down
4 changes: 2 additions & 2 deletions packages/pg-cursor/package.json
@@ -1,6 +1,6 @@
{
"name": "pg-cursor",
"version": "2.6.0",
"version": "2.7.1",
"description": "Query cursor extension for node-postgres",
"main": "index.js",
"directories": {
Expand All @@ -18,7 +18,7 @@
"license": "MIT",
"devDependencies": {
"mocha": "^7.1.2",
"pg": "^8.6.0"
"pg": "^8.7.1"
},
"peerDependencies": {
"pg": "^8"
Expand Down
11 changes: 11 additions & 0 deletions packages/pg-cursor/test/close.js
Expand Up @@ -23,6 +23,17 @@ describe('close', function () {
})
})

it('can close a finished cursor a promise', function (done) {
const cursor = new Cursor(text)
this.client.query(cursor)
cursor.read(100, (err) => {
assert.ifError(err)
cursor.close().then(() => {
this.client.query('SELECT NOW()', done)
})
})
})

it('closes cursor early', function (done) {
const cursor = new Cursor(text)
this.client.query(cursor)
Expand Down
51 changes: 51 additions & 0 deletions packages/pg-cursor/test/promises.js
@@ -0,0 +1,51 @@
const assert = require('assert')
const Cursor = require('../')
const pg = require('pg')

const text = 'SELECT generate_series as num FROM generate_series(0, 5)'

describe('cursor using promises', function () {
beforeEach(function (done) {
const client = (this.client = new pg.Client())
client.connect(done)

this.pgCursor = function (text, values) {
return client.query(new Cursor(text, values || []))
}
})

afterEach(function () {
this.client.end()
})

it('resolve with result', async function () {
const cursor = this.pgCursor(text)
const res = await cursor.read(6)
assert.strictEqual(res.length, 6)
})

it('reject with error', function (done) {
const cursor = this.pgCursor('select asdfasdf')
cursor.read(1).catch((err) => {
assert(err)
done()
})
})

it('read multiple times', async function () {
const cursor = this.pgCursor(text)
let res

res = await cursor.read(2)
assert.strictEqual(res.length, 2)

res = await cursor.read(3)
assert.strictEqual(res.length, 3)

res = await cursor.read(1)
assert.strictEqual(res.length, 1)

res = await cursor.read(1)
assert.strictEqual(res.length, 0)
})
})
11 changes: 11 additions & 0 deletions packages/pg-pool/index.js
Expand Up @@ -83,6 +83,7 @@ class Pool extends EventEmitter {

this.options.max = this.options.max || this.options.poolSize || 10
this.options.maxUses = this.options.maxUses || Infinity
this.options.allowExitOnIdle = this.options.allowExitOnIdle || false
this.log = this.options.log || function () {}
this.Client = this.options.Client || Client || require('pg').Client
this.Promise = this.options.Promise || global.Promise
Expand Down Expand Up @@ -136,6 +137,7 @@ class Pool extends EventEmitter {
const idleItem = this._idle.pop()
clearTimeout(idleItem.timeoutId)
const client = idleItem.client
client.ref && client.ref()
const idleListener = idleItem.idleListener

return this._acquireClient(client, pendingItem, idleListener, false)
Expand Down Expand Up @@ -323,6 +325,15 @@ class Pool extends EventEmitter {
this.log('remove idle client')
this._remove(client)
}, this.options.idleTimeoutMillis)

if (this.options.allowExitOnIdle) {
// allow Node to exit if this is all that's left
tid.unref()
}
}

if (this.options.allowExitOnIdle) {
client.unref()
}

this._idle.push(new IdleItem(client, idleListener, tid))
Expand Down
2 changes: 1 addition & 1 deletion packages/pg-pool/package.json
@@ -1,6 +1,6 @@
{
"name": "pg-pool",
"version": "3.3.0",
"version": "3.4.1",
"description": "Connection pool for node-postgres",
"main": "index.js",
"directories": {
Expand Down
16 changes: 16 additions & 0 deletions packages/pg-pool/test/idle-timeout-exit.js
@@ -0,0 +1,16 @@
// This test is meant to be spawned from idle-timeout.js
if (module === require.main) {
const allowExitOnIdle = process.env.ALLOW_EXIT_ON_IDLE === '1'
const Pool = require('../index')

const pool = new Pool({ idleTimeoutMillis: 200, ...(allowExitOnIdle ? { allowExitOnIdle: true } : {}) })
pool.query('SELECT NOW()', (err, res) => console.log('completed first'))
pool.on('remove', () => {
console.log('removed')
done()
})

setTimeout(() => {
pool.query('SELECT * from generate_series(0, 1000)', (err, res) => console.log('completed second'))
}, 50)
}
31 changes: 31 additions & 0 deletions packages/pg-pool/test/idle-timeout.js
Expand Up @@ -4,6 +4,8 @@ const expect = require('expect.js')

const describe = require('mocha').describe
const it = require('mocha').it
const { fork } = require('child_process')
const path = require('path')

const Pool = require('../')

Expand Down Expand Up @@ -84,4 +86,33 @@ describe('idle timeout', () => {
return pool.end()
})
)

it('unrefs the connections and timeouts so the program can exit when idle when the allowExitOnIdle option is set', function (done) {
const child = fork(path.join(__dirname, 'idle-timeout-exit.js'), [], {
silent: true,
env: { ...process.env, ALLOW_EXIT_ON_IDLE: '1' },
})
let result = ''
child.stdout.setEncoding('utf8')
child.stdout.on('data', (chunk) => (result += chunk))
child.on('error', (err) => done(err))
child.on('close', () => {
expect(result).to.equal('completed first\ncompleted second\n')
done()
})
})

it('keeps old behavior when allowExitOnIdle option is not set', function (done) {
const child = fork(path.join(__dirname, 'idle-timeout-exit.js'), [], {
silent: true,
})
let result = ''
child.stdout.setEncoding('utf8')
child.stdout.on('data', (chunk) => (result += chunk))
child.on('error', (err) => done(err))
child.on('close', () => {
expect(result).to.equal('completed first\ncompleted second\nremoved\n')
done()
})
})
})
6 changes: 3 additions & 3 deletions packages/pg-query-stream/package.json
@@ -1,6 +1,6 @@
{
"name": "pg-query-stream",
"version": "4.1.0",
"version": "4.2.1",
"description": "Postgres query result returned as readable stream",
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
Expand Down Expand Up @@ -37,13 +37,13 @@
"concat-stream": "~1.0.1",
"eslint-plugin-promise": "^3.5.0",
"mocha": "^7.1.2",
"pg": "^8.6.0",
"pg": "^8.7.1",
"stream-spec": "~0.3.5",
"stream-tester": "0.0.5",
"ts-node": "^8.5.4",
"typescript": "^4.0.3"
},
"dependencies": {
"pg-cursor": "^2.6.0"
"pg-cursor": "^2.7.1"
}
}
8 changes: 8 additions & 0 deletions packages/pg/lib/client.js
Expand Up @@ -577,6 +577,14 @@ class Client extends EventEmitter {
return result
}

ref() {
this.connection.ref()
}

unref() {
this.connection.unref()
}

end(cb) {
this._ending = true

Expand Down
8 changes: 8 additions & 0 deletions packages/pg/lib/connection.js
Expand Up @@ -177,6 +177,14 @@ class Connection extends EventEmitter {
this._send(syncBuffer)
}

ref() {
this.stream.ref()
}

unref() {
this.stream.unref()
}

end() {
// 0x58 = 'X'
this._ending = true
Expand Down
3 changes: 3 additions & 0 deletions packages/pg/lib/native/client.js
Expand Up @@ -285,6 +285,9 @@ Client.prototype.cancel = function (query) {
}
}

Client.prototype.ref = function () {}
Client.prototype.unref = function () {}

Client.prototype.setTypeParser = function (oid, format, parseFn) {
return this._types.setTypeParser(oid, format, parseFn)
}
Expand Down
4 changes: 2 additions & 2 deletions packages/pg/package.json
@@ -1,6 +1,6 @@
{
"name": "pg",
"version": "8.6.0",
"version": "8.7.1",
"description": "PostgreSQL client - pure javascript & libpq with the same API",
"keywords": [
"database",
Expand All @@ -23,7 +23,7 @@
"buffer-writer": "2.0.0",
"packet-reader": "1.0.0",
"pg-connection-string": "^2.5.0",
"pg-pool": "^3.3.0",
"pg-pool": "^3.4.1",
"pg-protocol": "^1.5.0",
"pg-types": "^2.1.0",
"pgpass": "1.x"
Expand Down

0 comments on commit 4fab71e

Please sign in to comment.