diff --git a/lib/client.js b/lib/client.js index 625aad682..8e7d307f7 100644 --- a/lib/client.js +++ b/lib/client.js @@ -399,15 +399,20 @@ Client.prototype.query = function (config, values, callback) { // can take in strings, config object or query object var query var result + var readTimeout + var readTimeoutTimer + var queryCallback if (config === null || config === undefined) { throw new TypeError('Client was passed a null or undefined query') } else if (typeof config.submit === 'function') { + readTimeout = config.query_timeout || this.connectionParameters.query_timeout result = query = config if (typeof values === 'function') { query.callback = query.callback || values } } else { + readTimeout = this.connectionParameters.query_timeout query = new Query(config, values, callback) if (!query.callback) { result = new this._Promise((resolve, reject) => { @@ -416,6 +421,37 @@ Client.prototype.query = function (config, values, callback) { } } + if (readTimeout) { + queryCallback = query.callback + + readTimeoutTimer = setTimeout(() => { + var error = new Error('Query read timeout') + + process.nextTick(() => { + query.handleError(error, this.connection) + }) + + queryCallback(error) + + // we already returned an error, + // just do nothing if query completes + query.callback = () => {} + + // Remove from queue + var index = this.queryQueue.indexOf(query) + if (index > -1) { + this.queryQueue.splice(index, 1) + } + + this._pulseQueryQueue() + }, readTimeout) + + query.callback = (err, res) => { + clearTimeout(readTimeoutTimer) + queryCallback(err, res) + } + } + if (this.binary && !query.binary) { query.binary = true } diff --git a/lib/connection-parameters.js b/lib/connection-parameters.js index 19267058f..745311ad0 100644 --- a/lib/connection-parameters.js +++ b/lib/connection-parameters.js @@ -65,6 +65,7 @@ var ConnectionParameters = function (config) { this.application_name = val('application_name', config, 'PGAPPNAME') this.fallback_application_name = val('fallback_application_name', config, false) this.statement_timeout = val('statement_timeout', config, false) + this.query_timeout = val('query_timeout', config, false) } // Convert arg to a string, surround in single quotes, and escape single quotes and backslashes diff --git a/lib/defaults.js b/lib/defaults.js index 30214b1d8..6b0a98f1b 100644 --- a/lib/defaults.js +++ b/lib/defaults.js @@ -55,7 +55,10 @@ module.exports = { parseInputDatesAsUTC: false, // max milliseconds any query using this connection will execute for before timing out in error. false=unlimited - statement_timeout: false + statement_timeout: false, + + // max miliseconds to wait for query to complete (client side) + query_timeout: false } var pgTypes = require('pg-types') diff --git a/lib/native/client.js b/lib/native/client.js index c88bfb12e..5338f7f10 100644 --- a/lib/native/client.js +++ b/lib/native/client.js @@ -146,14 +146,21 @@ Client.prototype.connect = function (callback) { Client.prototype.query = function (config, values, callback) { var query var result - - if (typeof config.submit === 'function') { + var readTimeout + var readTimeoutTimer + var queryCallback + + if (config === null || config === undefined) { + throw new TypeError('Client was passed a null or undefined query') + } else if (typeof config.submit === 'function') { + readTimeout = config.query_timeout || this.connectionParameters.query_timeout result = query = config // accept query(new Query(...), (err, res) => { }) style if (typeof values === 'function') { config.callback = values } } else { + readTimeout = this.connectionParameters.query_timeout query = new NativeQuery(config, values, callback) if (!query.callback) { let resolveOut, rejectOut @@ -165,6 +172,37 @@ Client.prototype.query = function (config, values, callback) { } } + if (readTimeout) { + queryCallback = query.callback + + readTimeoutTimer = setTimeout(() => { + var error = new Error('Query read timeout') + + process.nextTick(() => { + query.handleError(error, this.connection) + }) + + queryCallback(error) + + // we already returned an error, + // just do nothing if query completes + query.callback = () => {} + + // Remove from queue + var index = this._queryQueue.indexOf(query) + if (index > -1) { + this._queryQueue.splice(index, 1) + } + + this._pulseQueryQueue() + }, readTimeout) + + query.callback = (err, res) => { + clearTimeout(readTimeoutTimer) + queryCallback(err, res) + } + } + if (!this._queryable) { query.native = this.native process.nextTick(() => { diff --git a/test/integration/client/api-tests.js b/test/integration/client/api-tests.js index e18c3749c..c274bbd36 100644 --- a/test/integration/client/api-tests.js +++ b/test/integration/client/api-tests.js @@ -15,6 +15,47 @@ suite.test('pool callback behavior', done => { }) }) +suite.test('query timeout', (cb) => { + const pool = new pg.Pool({query_timeout: 1000}) + pool.connect().then((client) => { + client.query('SELECT pg_sleep(2)', assert.calls(function (err, result) { + assert(err) + assert(err.message === 'Query read timeout') + client.release() + pool.end(cb) + })) + }) +}) + +suite.test('query recover from timeout', (cb) => { + const pool = new pg.Pool({query_timeout: 1000}) + pool.connect().then((client) => { + client.query('SELECT pg_sleep(20)', assert.calls(function (err, result) { + assert(err) + assert(err.message === 'Query read timeout') + client.release(err) + pool.connect().then((client) => { + client.query('SELECT 1', assert.calls(function (err, result) { + assert(!err) + client.release(err) + pool.end(cb) + })) + }) + })) + }) +}) + +suite.test('query no timeout', (cb) => { + const pool = new pg.Pool({query_timeout: 10000}) + pool.connect().then((client) => { + client.query('SELECT pg_sleep(1)', assert.calls(function (err, result) { + assert(!err) + client.release() + pool.end(cb) + })) + }) +}) + suite.test('callback API', done => { const client = new helper.Client() client.query('CREATE TEMP TABLE peep(name text)')