diff --git a/lib/client.js b/lib/client.js index 140f505b2..1aadac08f 100644 --- a/lib/client.js +++ b/lib/client.js @@ -53,6 +53,22 @@ var Client = function (config) { util.inherits(Client, EventEmitter) +Client.prototype._errorAllQueries = function (err) { + const enqueueError = (query) => { + process.nextTick(() => { + query.handleError(err, this.connection) + }) + } + + if (this.activeQuery) { + enqueueError(this.activeQuery) + this.activeQuery = null + } + + this.queryQueue.forEach(enqueueError) + this.queryQueue.length = 0 +} + Client.prototype._connect = function (callback) { var self = this var con = this.connection @@ -127,21 +143,7 @@ Client.prototype._connect = function (callback) { const connectedErrorHandler = (err) => { this._queryable = false - - const enqueueError = (query) => { - process.nextTick(() => { - query.handleError(err, con) - }) - } - - if (this.activeQuery) { - enqueueError(this.activeQuery) - this.activeQuery = null - } - - this.queryQueue.forEach(enqueueError) - this.queryQueue = [] - + this._errorAllQueries(err) this.emit('error', err) } @@ -192,17 +194,17 @@ Client.prototype._connect = function (callback) { }) con.once('end', () => { - if (this.activeQuery) { - var disconnectError = new Error('Connection terminated') - this.activeQuery.handleError(disconnectError, con) - this.activeQuery = null - } + const error = this._ending + ? new Error('Connection terminated') + : new Error('Connection terminated unexpectedly') + + this._errorAllQueries(error) + if (!this._ending) { // if the connection is ended without us calling .end() // on this client then we have an unexpected disconnection // treat this as an error unless we've already emitted an error // during connection. - const error = new Error('Connection terminated unexpectedly') if (this._connecting && !this._connectionError) { if (callback) { callback(error) @@ -213,6 +215,7 @@ Client.prototype._connect = function (callback) { connectedErrorHandler(error) } } + this.emit('end') }) diff --git a/lib/native/client.js b/lib/native/client.js index bed548ad8..c9dca63c6 100644 --- a/lib/native/client.js +++ b/lib/native/client.js @@ -32,8 +32,10 @@ var Client = module.exports = function (config) { }) this._queryQueue = [] - this._connected = false + this._ending = false this._connecting = false + this._connected = false + this._queryable = true // keep these on the object for legacy reasons // for the time being. TODO: deprecate all this jazz @@ -52,50 +54,48 @@ Client.Query = NativeQuery util.inherits(Client, EventEmitter) +Client.prototype._errorAllQueries = function (err) { + const enqueueError = (query) => { + process.nextTick(() => { + query.native = this.native + query.handleError(err) + }) + } + + if (this._hasActiveQuery()) { + enqueueError(this._activeQuery) + this._activeQuery = null + } + + this._queryQueue.forEach(enqueueError) + this._queryQueue.length = 0 +} + // connect to the backend // pass an optional callback to be called once connected // or with an error if there was a connection error -// if no callback is passed and there is a connection error -// the client will emit an error event. -Client.prototype.connect = function (cb) { +Client.prototype._connect = function (cb) { var self = this - var onError = function (err) { - if (cb) return cb(err) - return self.emit('error', err) - } - - var result - if (!cb) { - var resolveOut, rejectOut - cb = (err) => err ? rejectOut(err) : resolveOut() - result = new global.Promise(function (resolve, reject) { - resolveOut = resolve - rejectOut = reject - }) - } - if (this._connecting) { process.nextTick(() => cb(new Error('Client has already been connected. You cannot reuse a client.'))) - return result + return } this._connecting = true this.connectionParameters.getLibpqConnectionString(function (err, conString) { - if (err) return onError(err) + if (err) return cb(err) self.native.connect(conString, function (err) { - if (err) return onError(err) + if (err) return cb(err) // set internal states to connected self._connected = true // handle connection errors from the native layer self.native.on('error', function (err) { - // error will be handled by active query - if (self._activeQuery && self._activeQuery.state !== 'end') { - return - } + self._queryable = false + self._errorAllQueries(err) self.emit('error', err) }) @@ -110,12 +110,26 @@ Client.prototype.connect = function (cb) { self.emit('connect') self._pulseQueryQueue(true) - // possibly call the optional callback - if (cb) cb() + cb() }) }) +} - return result +Client.prototype.connect = function (callback) { + if (callback) { + this._connect(callback) + return + } + + return new Promise((resolve, reject) => { + this._connect((error) => { + if (error) { + reject(error) + } else { + resolve() + } + }) + }) } // send a query to the server @@ -129,26 +143,43 @@ Client.prototype.connect = function (cb) { // optional string rowMode = 'array' for an array of results // } Client.prototype.query = function (config, values, callback) { + var query + var result + if (typeof config.submit === 'function') { + result = query = config // accept query(new Query(...), (err, res) => { }) style if (typeof values === 'function') { config.callback = values } - this._queryQueue.push(config) - this._pulseQueryQueue() - return config + } else { + query = new NativeQuery(config, values, callback) + if (!query.callback) { + let resolveOut, rejectOut + result = new Promise((resolve, reject) => { + resolveOut = resolve + rejectOut = reject + }) + query.callback = (err, res) => err ? rejectOut(err) : resolveOut(res) + } } - var query = new NativeQuery(config, values, callback) - var result - if (!query.callback) { - let resolveOut, rejectOut - result = new Promise((resolve, reject) => { - resolveOut = resolve - rejectOut = reject + if (!this._queryable) { + query.native = this.native + process.nextTick(() => { + query.handleError(new Error('Client has encountered a connection error and is not queryable')) + }) + return result + } + + if (this._ending) { + query.native = this.native + process.nextTick(() => { + query.handleError(new Error('Client was closed and is not queryable')) }) - query.callback = (err, res) => err ? rejectOut(err) : resolveOut(res) + return result } + this._queryQueue.push(query) this._pulseQueryQueue() return result @@ -157,6 +188,9 @@ Client.prototype.query = function (config, values, callback) { // disconnect from the backend server Client.prototype.end = function (cb) { var self = this + + this._ending = true + if (!this._connected) { this.once('connect', this.end.bind(this, cb)) } @@ -170,12 +204,7 @@ Client.prototype.end = function (cb) { }) } this.native.end(function () { - // send an error to the active query - if (self._hasActiveQuery()) { - var msg = 'Connection terminated' - self._queryQueue.length = 0 - self._activeQuery.handleError(new Error(msg)) - } + self._errorAllQueries(new Error('Connection terminated')) self.emit('end') if (cb) cb() }) diff --git a/test/integration/client/error-handling-tests.js b/test/integration/client/error-handling-tests.js index 97b0ce83f..3d4679db9 100644 --- a/test/integration/client/error-handling-tests.js +++ b/test/integration/client/error-handling-tests.js @@ -73,12 +73,9 @@ suite.test('query receives error on client shutdown', function (done) { client.query(new pg.Query(config), assert.calls(function (err, res) { assert(err instanceof Error) queryError = err + done() })) setTimeout(() => client.end(), 50) - client.once('end', () => { - assert(queryError instanceof Error) - done() - }) })) }) diff --git a/test/integration/connection-pool/error-tests.js b/test/integration/connection-pool/error-tests.js index 4d7bb9c1f..30745473e 100644 --- a/test/integration/connection-pool/error-tests.js +++ b/test/integration/connection-pool/error-tests.js @@ -52,7 +52,11 @@ suite.test('connection-level errors cause queued queries to fail', (cb) => { const pool = new pg.Pool() pool.connect(assert.success((client, done) => { client.query('SELECT pg_terminate_backend(pg_backend_pid())', assert.calls((err) => { - assert.equal(err.code, '57P01') + if (helper.args.native) { + assert.ok(err) + } else { + assert.equal(err.code, '57P01') + } })) pool.once('error', assert.calls((err, brokenClient) => { @@ -60,7 +64,11 @@ suite.test('connection-level errors cause queued queries to fail', (cb) => { })) client.query('SELECT 1', assert.calls((err) => { - assert.equal(err.message, 'Connection terminated unexpectedly') + if (helper.args.native) { + assert.ok(/^server closed the connection unexpectedly/.test(err.message)) + } else { + assert.equal(err.message, 'Connection terminated unexpectedly') + } done() pool.end() @@ -73,7 +81,11 @@ suite.test('connection-level errors cause future queries to fail', (cb) => { const pool = new pg.Pool() pool.connect(assert.success((client, done) => { client.query('SELECT pg_terminate_backend(pg_backend_pid())', assert.calls((err) => { - assert.equal(err.code, '57P01') + if (helper.args.native) { + assert.ok(err) + } else { + assert.equal(err.code, '57P01') + } })) pool.once('error', assert.calls((err, brokenClient) => {