Skip to content

Commit

Permalink
Give same error to queued queries as to active query when ending
Browse files Browse the repository at this point in the history
and do so in the native Client as well.
  • Loading branch information
charmander committed May 5, 2018
1 parent 99ba060 commit 705d89c
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 74 deletions.
45 changes: 24 additions & 21 deletions lib/client.js
Expand Up @@ -53,6 +53,22 @@ var Client = function (config) {

util.inherits(Client, EventEmitter)

Client.prototype._errorAllQueries = function (err) {
const enqueueError = (query) => {
process.nextTick(() => {
query.handleError(err, this.connection)
})
}

if (this.activeQuery) {
enqueueError(this.activeQuery)
this.activeQuery = null
}

this.queryQueue.forEach(enqueueError)
this.queryQueue.length = 0
}

Client.prototype._connect = function (callback) {
var self = this
var con = this.connection
Expand Down Expand Up @@ -127,21 +143,7 @@ Client.prototype._connect = function (callback) {

const connectedErrorHandler = (err) => {
this._queryable = false

const enqueueError = (query) => {
process.nextTick(() => {
query.handleError(err, con)
})
}

if (this.activeQuery) {
enqueueError(this.activeQuery)
this.activeQuery = null
}

this.queryQueue.forEach(enqueueError)
this.queryQueue = []

this._errorAllQueries(err)
this.emit('error', err)
}

Expand Down Expand Up @@ -192,17 +194,17 @@ Client.prototype._connect = function (callback) {
})

con.once('end', () => {
if (this.activeQuery) {
var disconnectError = new Error('Connection terminated')
this.activeQuery.handleError(disconnectError, con)
this.activeQuery = null
}
const error = this._ending
? new Error('Connection terminated')
: new Error('Connection terminated unexpectedly')

this._errorAllQueries(error)

if (!this._ending) {
// if the connection is ended without us calling .end()
// on this client then we have an unexpected disconnection
// treat this as an error unless we've already emitted an error
// during connection.
const error = new Error('Connection terminated unexpectedly')
if (this._connecting && !this._connectionError) {
if (callback) {
callback(error)
Expand All @@ -213,6 +215,7 @@ Client.prototype._connect = function (callback) {
connectedErrorHandler(error)
}
}

this.emit('end')
})

Expand Down
121 changes: 75 additions & 46 deletions lib/native/client.js
Expand Up @@ -32,8 +32,10 @@ var Client = module.exports = function (config) {
})

this._queryQueue = []
this._connected = false
this._ending = false
this._connecting = false
this._connected = false
this._queryable = true

// keep these on the object for legacy reasons
// for the time being. TODO: deprecate all this jazz
Expand All @@ -52,50 +54,48 @@ Client.Query = NativeQuery

util.inherits(Client, EventEmitter)

Client.prototype._errorAllQueries = function (err) {
const enqueueError = (query) => {
process.nextTick(() => {
query.native = this.native
query.handleError(err)
})
}

if (this._hasActiveQuery()) {
enqueueError(this._activeQuery)
this._activeQuery = null
}

this._queryQueue.forEach(enqueueError)
this._queryQueue.length = 0
}

// connect to the backend
// pass an optional callback to be called once connected
// or with an error if there was a connection error
// if no callback is passed and there is a connection error
// the client will emit an error event.
Client.prototype.connect = function (cb) {
Client.prototype._connect = function (cb) {
var self = this

var onError = function (err) {
if (cb) return cb(err)
return self.emit('error', err)
}

var result
if (!cb) {
var resolveOut, rejectOut
cb = (err) => err ? rejectOut(err) : resolveOut()
result = new global.Promise(function (resolve, reject) {
resolveOut = resolve
rejectOut = reject
})
}

if (this._connecting) {
process.nextTick(() => cb(new Error('Client has already been connected. You cannot reuse a client.')))
return result
return
}

this._connecting = true

this.connectionParameters.getLibpqConnectionString(function (err, conString) {
if (err) return onError(err)
if (err) return cb(err)
self.native.connect(conString, function (err) {
if (err) return onError(err)
if (err) return cb(err)

// set internal states to connected
self._connected = true

// handle connection errors from the native layer
self.native.on('error', function (err) {
// error will be handled by active query
if (self._activeQuery && self._activeQuery.state !== 'end') {
return
}
self._queryable = false
self._errorAllQueries(err)
self.emit('error', err)
})

Expand All @@ -110,12 +110,26 @@ Client.prototype.connect = function (cb) {
self.emit('connect')
self._pulseQueryQueue(true)

// possibly call the optional callback
if (cb) cb()
cb()
})
})
}

return result
Client.prototype.connect = function (callback) {
if (callback) {
this._connect(callback)
return
}

return new Promise((resolve, reject) => {
this._connect((error) => {
if (error) {
reject(error)
} else {
resolve()
}
})
})
}

// send a query to the server
Expand All @@ -129,26 +143,43 @@ Client.prototype.connect = function (cb) {
// optional string rowMode = 'array' for an array of results
// }
Client.prototype.query = function (config, values, callback) {
var query
var result

if (typeof config.submit === 'function') {
result = query = config
// accept query(new Query(...), (err, res) => { }) style
if (typeof values === 'function') {
config.callback = values
}
this._queryQueue.push(config)
this._pulseQueryQueue()
return config
} else {
query = new NativeQuery(config, values, callback)
if (!query.callback) {
let resolveOut, rejectOut
result = new Promise((resolve, reject) => {
resolveOut = resolve
rejectOut = reject
})
query.callback = (err, res) => err ? rejectOut(err) : resolveOut(res)
}
}

var query = new NativeQuery(config, values, callback)
var result
if (!query.callback) {
let resolveOut, rejectOut
result = new Promise((resolve, reject) => {
resolveOut = resolve
rejectOut = reject
if (!this._queryable) {
query.native = this.native
process.nextTick(() => {
query.handleError(new Error('Client has encountered a connection error and is not queryable'))
})
return result
}

if (this._ending) {
query.native = this.native
process.nextTick(() => {
query.handleError(new Error('Client was closed and is not queryable'))
})
query.callback = (err, res) => err ? rejectOut(err) : resolveOut(res)
return result
}

this._queryQueue.push(query)
this._pulseQueryQueue()
return result
Expand All @@ -157,6 +188,9 @@ Client.prototype.query = function (config, values, callback) {
// disconnect from the backend server
Client.prototype.end = function (cb) {
var self = this

this._ending = true

if (!this._connected) {
this.once('connect', this.end.bind(this, cb))
}
Expand All @@ -170,12 +204,7 @@ Client.prototype.end = function (cb) {
})
}
this.native.end(function () {
// send an error to the active query
if (self._hasActiveQuery()) {
var msg = 'Connection terminated'
self._queryQueue.length = 0
self._activeQuery.handleError(new Error(msg))
}
self._errorAllQueries(new Error('Connection terminated'))
self.emit('end')
if (cb) cb()
})
Expand Down
5 changes: 1 addition & 4 deletions test/integration/client/error-handling-tests.js
Expand Up @@ -73,12 +73,9 @@ suite.test('query receives error on client shutdown', function (done) {
client.query(new pg.Query(config), assert.calls(function (err, res) {
assert(err instanceof Error)
queryError = err
done()
}))
setTimeout(() => client.end(), 50)
client.once('end', () => {
assert(queryError instanceof Error)
done()
})
}))
})

Expand Down
18 changes: 15 additions & 3 deletions test/integration/connection-pool/error-tests.js
Expand Up @@ -52,15 +52,23 @@ suite.test('connection-level errors cause queued queries to fail', (cb) => {
const pool = new pg.Pool()
pool.connect(assert.success((client, done) => {
client.query('SELECT pg_terminate_backend(pg_backend_pid())', assert.calls((err) => {
assert.equal(err.code, '57P01')
if (helper.args.native) {
assert.ok(err)
} else {
assert.equal(err.code, '57P01')
}
}))

pool.once('error', assert.calls((err, brokenClient) => {
assert.equal(client, brokenClient)
}))

client.query('SELECT 1', assert.calls((err) => {
assert.equal(err.message, 'Connection terminated unexpectedly')
if (helper.args.native) {
assert.ok(/^server closed the connection unexpectedly/.test(err.message))
} else {
assert.equal(err.message, 'Connection terminated unexpectedly')
}

done()
pool.end()
Expand All @@ -73,7 +81,11 @@ suite.test('connection-level errors cause future queries to fail', (cb) => {
const pool = new pg.Pool()
pool.connect(assert.success((client, done) => {
client.query('SELECT pg_terminate_backend(pg_backend_pid())', assert.calls((err) => {
assert.equal(err.code, '57P01')
if (helper.args.native) {
assert.ok(err)
} else {
assert.equal(err.code, '57P01')
}
}))

pool.once('error', assert.calls((err, brokenClient) => {
Expand Down

0 comments on commit 705d89c

Please sign in to comment.