From 3828aa86081491282c118884c245b4a19619f02e Mon Sep 17 00:00:00 2001 From: Charmander <~@charmander.me> Date: Wed, 3 Oct 2018 08:37:15 -0700 Subject: [PATCH] Queued query errors (#1503) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add tests for query callbacks after connection-level errors * Ensure callbacks are executed for all queued queries after connection-level errors Separates socket errors from error messages, sends socket errors to all queries in the queue, marks clients as unusable after socket errors. This is not very pleasant but should maintain backwards compatibility…? * Always call `handleError` asynchronously This doesn’t match the original behaviour of the type errors, but it’s correct. * Fix return value of `Client.prototype.query` in immediate error cases * Mark clients with closed connections as unusable consistently * Add tests for error event when connecting Client * Ensure the promise and callback versions of Client#connect always have the same behaviour * Give same error to queued queries as to active query when ending and do so in the native Client as well. * Restore original ordering between queued query callbacks and 'end' event --- lib/client.js | 125 ++++++++++++----- lib/connection.js | 3 +- lib/native/client.js | 128 +++++++++++------- lib/query.js | 11 +- .../client/error-handling-tests.js | 18 +++ .../connection-pool/error-tests.js | 59 +++++++- 6 files changed, 252 insertions(+), 92 deletions(-) diff --git a/lib/client.js b/lib/client.js index feedebbd8..432443793 100644 --- a/lib/client.js +++ b/lib/client.js @@ -36,6 +36,7 @@ var Client = function (config) { this._connecting = false this._connected = false this._connectionError = false + this._queryable = true this.connection = c.connection || new Connection({ stream: c.stream, @@ -52,16 +53,31 @@ var Client = function (config) { util.inherits(Client, EventEmitter) -Client.prototype.connect = function (callback) { +Client.prototype._errorAllQueries = function (err) { + const enqueueError = (query) => { + process.nextTick(() => { + query.handleError(err, this.connection) + }) + } + + if (this.activeQuery) { + enqueueError(this.activeQuery) + this.activeQuery = null + } + + this.queryQueue.forEach(enqueueError) + this.queryQueue.length = 0 +} + +Client.prototype._connect = function (callback) { var self = this var con = this.connection if (this._connecting || this._connected) { const err = new Error('Client has already been connected. You cannot reuse a client.') - if (callback) { + process.nextTick(() => { callback(err) - return undefined - } - return Promise.reject(err) + }) + return } this._connecting = true @@ -126,15 +142,25 @@ Client.prototype.connect = function (callback) { } const connectedErrorHandler = (err) => { - if (this.activeQuery) { - var activeQuery = self.activeQuery - this.activeQuery = null - return activeQuery.handleError(err, con) - } + this._queryable = false + this._errorAllQueries(err) this.emit('error', err) } + const connectedErrorMessageHandler = (msg) => { + const activeQuery = this.activeQuery + + if (!activeQuery) { + connectedErrorHandler(msg) + return + } + + this.activeQuery = null + activeQuery.handleError(msg, con) + } + con.on('error', connectingErrorHandler) + con.on('errorMessage', connectingErrorHandler) // hook up query handling events to connection // after the connection initially becomes ready for queries @@ -143,7 +169,9 @@ Client.prototype.connect = function (callback) { self._connected = true self._attachListeners(con) con.removeListener('error', connectingErrorHandler) + con.removeListener('errorMessage', connectingErrorHandler) con.on('error', connectedErrorHandler) + con.on('errorMessage', connectedErrorMessageHandler) // process possible callback argument to Client#connect if (callback) { @@ -166,43 +194,53 @@ Client.prototype.connect = function (callback) { }) con.once('end', () => { - if (this.activeQuery) { - var disconnectError = new Error('Connection terminated') - this.activeQuery.handleError(disconnectError, con) - this.activeQuery = null - } + const error = this._ending + ? new Error('Connection terminated') + : new Error('Connection terminated unexpectedly') + + this._errorAllQueries(error) + if (!this._ending) { // if the connection is ended without us calling .end() // on this client then we have an unexpected disconnection // treat this as an error unless we've already emitted an error // during connection. - const error = new Error('Connection terminated unexpectedly') if (this._connecting && !this._connectionError) { if (callback) { callback(error) } else { - this.emit('error', error) + connectedErrorHandler(error) } } else if (!this._connectionError) { - this.emit('error', error) + connectedErrorHandler(error) } } - this.emit('end') + + process.nextTick(() => { + this.emit('end') + }) }) con.on('notice', function (msg) { self.emit('notice', msg) }) +} - if (!callback) { - return new global.Promise((resolve, reject) => { - this.once('error', reject) - this.once('connect', () => { - this.removeListener('error', reject) +Client.prototype.connect = function (callback) { + if (callback) { + this._connect(callback) + return + } + + return new Promise((resolve, reject) => { + this._connect((error) => { + if (error) { + reject(error) + } else { resolve() - }) + } }) - } + }) } Client.prototype._attachListeners = function (con) { @@ -340,7 +378,15 @@ Client.prototype._pulseQueryQueue = function () { if (this.activeQuery) { this.readyForQuery = false this.hasExecuted = true - this.activeQuery.submit(this.connection) + + const queryError = this.activeQuery.submit(this.connection) + if (queryError) { + process.nextTick(() => { + this.activeQuery.handleError(queryError, this.connection) + this.readyForQuery = true + this._pulseQueryQueue() + }) + } } else if (this.hasExecuted) { this.activeQuery = null this.emit('drain') @@ -379,6 +425,20 @@ Client.prototype.query = function (config, values, callback) { query._result._getTypeParser = this._types.getTypeParser.bind(this._types) } + if (!this._queryable) { + process.nextTick(() => { + query.handleError(new Error('Client has encountered a connection error and is not queryable'), this.connection) + }) + return result + } + + if (this._ending) { + process.nextTick(() => { + query.handleError(new Error('Client was closed and is not queryable'), this.connection) + }) + return result + } + this.queryQueue.push(query) this._pulseQueryQueue() return result @@ -386,18 +446,19 @@ Client.prototype.query = function (config, values, callback) { Client.prototype.end = function (cb) { this._ending = true + if (this.activeQuery) { // if we have an active query we need to force a disconnect // on the socket - otherwise a hung query could block end forever - this.connection.stream.destroy(new Error('Connection terminated by user')) - return cb ? cb() : Promise.resolve() + this.connection.stream.destroy() + } else { + this.connection.end() } + if (cb) { - this.connection.end() this.connection.once('end', cb) } else { - return new global.Promise((resolve, reject) => { - this.connection.end() + return new Promise((resolve) => { this.connection.once('end', resolve) }) } diff --git a/lib/connection.js b/lib/connection.js index 799ab4ed8..177739c32 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -117,10 +117,11 @@ Connection.prototype.attachListeners = function (stream) { var packet = self._reader.read() while (packet) { var msg = self.parseMessage(packet) + var eventName = msg.name === 'error' ? 'errorMessage' : msg.name if (self._emitMessage) { self.emit('message', msg) } - self.emit(msg.name, msg) + self.emit(eventName, msg) packet = self._reader.read() } }) diff --git a/lib/native/client.js b/lib/native/client.js index bed548ad8..b18ff6ffa 100644 --- a/lib/native/client.js +++ b/lib/native/client.js @@ -32,8 +32,10 @@ var Client = module.exports = function (config) { }) this._queryQueue = [] - this._connected = false + this._ending = false this._connecting = false + this._connected = false + this._queryable = true // keep these on the object for legacy reasons // for the time being. TODO: deprecate all this jazz @@ -52,50 +54,48 @@ Client.Query = NativeQuery util.inherits(Client, EventEmitter) +Client.prototype._errorAllQueries = function (err) { + const enqueueError = (query) => { + process.nextTick(() => { + query.native = this.native + query.handleError(err) + }) + } + + if (this._hasActiveQuery()) { + enqueueError(this._activeQuery) + this._activeQuery = null + } + + this._queryQueue.forEach(enqueueError) + this._queryQueue.length = 0 +} + // connect to the backend // pass an optional callback to be called once connected // or with an error if there was a connection error -// if no callback is passed and there is a connection error -// the client will emit an error event. -Client.prototype.connect = function (cb) { +Client.prototype._connect = function (cb) { var self = this - var onError = function (err) { - if (cb) return cb(err) - return self.emit('error', err) - } - - var result - if (!cb) { - var resolveOut, rejectOut - cb = (err) => err ? rejectOut(err) : resolveOut() - result = new global.Promise(function (resolve, reject) { - resolveOut = resolve - rejectOut = reject - }) - } - if (this._connecting) { process.nextTick(() => cb(new Error('Client has already been connected. You cannot reuse a client.'))) - return result + return } this._connecting = true this.connectionParameters.getLibpqConnectionString(function (err, conString) { - if (err) return onError(err) + if (err) return cb(err) self.native.connect(conString, function (err) { - if (err) return onError(err) + if (err) return cb(err) // set internal states to connected self._connected = true // handle connection errors from the native layer self.native.on('error', function (err) { - // error will be handled by active query - if (self._activeQuery && self._activeQuery.state !== 'end') { - return - } + self._queryable = false + self._errorAllQueries(err) self.emit('error', err) }) @@ -110,12 +110,26 @@ Client.prototype.connect = function (cb) { self.emit('connect') self._pulseQueryQueue(true) - // possibly call the optional callback - if (cb) cb() + cb() }) }) +} - return result +Client.prototype.connect = function (callback) { + if (callback) { + this._connect(callback) + return + } + + return new Promise((resolve, reject) => { + this._connect((error) => { + if (error) { + reject(error) + } else { + resolve() + } + }) + }) } // send a query to the server @@ -129,26 +143,43 @@ Client.prototype.connect = function (cb) { // optional string rowMode = 'array' for an array of results // } Client.prototype.query = function (config, values, callback) { + var query + var result + if (typeof config.submit === 'function') { + result = query = config // accept query(new Query(...), (err, res) => { }) style if (typeof values === 'function') { config.callback = values } - this._queryQueue.push(config) - this._pulseQueryQueue() - return config + } else { + query = new NativeQuery(config, values, callback) + if (!query.callback) { + let resolveOut, rejectOut + result = new Promise((resolve, reject) => { + resolveOut = resolve + rejectOut = reject + }) + query.callback = (err, res) => err ? rejectOut(err) : resolveOut(res) + } } - var query = new NativeQuery(config, values, callback) - var result - if (!query.callback) { - let resolveOut, rejectOut - result = new Promise((resolve, reject) => { - resolveOut = resolve - rejectOut = reject + if (!this._queryable) { + query.native = this.native + process.nextTick(() => { + query.handleError(new Error('Client has encountered a connection error and is not queryable')) + }) + return result + } + + if (this._ending) { + query.native = this.native + process.nextTick(() => { + query.handleError(new Error('Client was closed and is not queryable')) }) - query.callback = (err, res) => err ? rejectOut(err) : resolveOut(res) + return result } + this._queryQueue.push(query) this._pulseQueryQueue() return result @@ -157,6 +188,9 @@ Client.prototype.query = function (config, values, callback) { // disconnect from the backend server Client.prototype.end = function (cb) { var self = this + + this._ending = true + if (!this._connected) { this.once('connect', this.end.bind(this, cb)) } @@ -170,14 +204,12 @@ Client.prototype.end = function (cb) { }) } this.native.end(function () { - // send an error to the active query - if (self._hasActiveQuery()) { - var msg = 'Connection terminated' - self._queryQueue.length = 0 - self._activeQuery.handleError(new Error(msg)) - } - self.emit('end') - if (cb) cb() + self._errorAllQueries(new Error('Connection terminated')) + + process.nextTick(() => { + self.emit('end') + if (cb) cb() + }) }) return result } diff --git a/lib/query.js b/lib/query.js index fe82061e3..94c2bc3c1 100644 --- a/lib/query.js +++ b/lib/query.js @@ -146,22 +146,17 @@ Query.prototype.handleError = function (err, connection) { Query.prototype.submit = function (connection) { if (typeof this.text !== 'string' && typeof this.name !== 'string') { - const err = new Error('A query must have either text or a name. Supplying neither is unsupported.') - connection.emit('error', err) - connection.emit('readyForQuery') - return + return new Error('A query must have either text or a name. Supplying neither is unsupported.') } if (this.values && !Array.isArray(this.values)) { - const err = new Error('Query values must be an array') - connection.emit('error', err) - connection.emit('readyForQuery') - return + return new Error('Query values must be an array') } if (this.requiresPreparation()) { this.prepare(connection) } else { connection.query(this.text) } + return null } Query.prototype.hasBeenParsed = function (connection) { diff --git a/test/integration/client/error-handling-tests.js b/test/integration/client/error-handling-tests.js index 55372d141..97b0ce83f 100644 --- a/test/integration/client/error-handling-tests.js +++ b/test/integration/client/error-handling-tests.js @@ -50,6 +50,18 @@ suite.test('re-using connections results in promise rejection', (done) => { }) }) +suite.test('using a client after closing it results in error', (done) => { + const client = new Client() + client.connect(() => { + client.end(assert.calls(() => { + client.query('SELECT 1', assert.calls((err) => { + assert.equal(err.message, 'Client was closed and is not queryable') + done() + })) + })) + }) +}) + suite.test('query receives error on client shutdown', function (done) { var client = new Client() client.connect(assert.success(function () { @@ -139,6 +151,9 @@ suite.test('when connecting to an invalid host with callback', function (done) { var client = new Client({ user: 'very invalid username' }) + client.on('error', () => { + assert.fail('unexpected error event when connecting') + }) client.connect(function (error, client) { assert(error instanceof Error) done() @@ -149,6 +164,9 @@ suite.test('when connecting to invalid host with promise', function (done) { var client = new Client({ user: 'very invalid username' }) + client.on('error', () => { + assert.fail('unexpected error event when connecting') + }) client.connect().catch((e) => done()) }) diff --git a/test/integration/connection-pool/error-tests.js b/test/integration/connection-pool/error-tests.js index 17cdcc47f..cadffe3db 100644 --- a/test/integration/connection-pool/error-tests.js +++ b/test/integration/connection-pool/error-tests.js @@ -2,9 +2,6 @@ var helper = require('./test-helper') const pg = helper.pg -// make pool hold 2 clients -const pool = new pg.Pool({ max: 2 }) - const suite = new helper.Suite() suite.test('connecting to invalid port', (cb) => { const pool = new pg.Pool({ port: 13801 }) @@ -12,6 +9,8 @@ suite.test('connecting to invalid port', (cb) => { }) suite.test('errors emitted on pool', (cb) => { + // make pool hold 2 clients + const pool = new pg.Pool({ max: 2 }) // get first client pool.connect(assert.success(function (client, done) { client.id = 1 @@ -46,3 +45,57 @@ suite.test('errors emitted on pool', (cb) => { }) })) }) + +suite.test('connection-level errors cause queued queries to fail', (cb) => { + const pool = new pg.Pool() + pool.connect(assert.success((client, done) => { + client.query('SELECT pg_terminate_backend(pg_backend_pid())', assert.calls((err) => { + if (helper.args.native) { + assert.ok(err) + } else { + assert.equal(err.code, '57P01') + } + })) + + pool.once('error', assert.calls((err, brokenClient) => { + assert.equal(client, brokenClient) + })) + + client.query('SELECT 1', assert.calls((err) => { + if (helper.args.native) { + assert.ok(err) + } else { + assert.equal(err.message, 'Connection terminated unexpectedly') + } + + done() + pool.end() + cb() + })) + })) +}) + +suite.test('connection-level errors cause future queries to fail', (cb) => { + const pool = new pg.Pool() + pool.connect(assert.success((client, done) => { + client.query('SELECT pg_terminate_backend(pg_backend_pid())', assert.calls((err) => { + if (helper.args.native) { + assert.ok(err) + } else { + assert.equal(err.code, '57P01') + } + })) + + pool.once('error', assert.calls((err, brokenClient) => { + assert.equal(client, brokenClient) + + client.query('SELECT 1', assert.calls((err) => { + assert.equal(err.message, 'Client has encountered a connection error and is not queryable') + + done() + pool.end() + cb() + })) + })) + })) +})