diff --git a/lib/client.js b/lib/client.js index 3601787c7..517f5cc14 100644 --- a/lib/client.js +++ b/lib/client.js @@ -44,6 +44,7 @@ var Client = function (config) { stream: c.stream, ssl: this.connectionParameters.ssl, keepAlive: c.keepAlive || false, + keepAliveInitialDelayMillis: c.keepAliveInitialDelayMillis || 0, encoding: this.connectionParameters.client_encoding || 'utf8' }) this.queryQueue = [] @@ -51,6 +52,7 @@ var Client = function (config) { this.processID = null this.secretKey = null this.ssl = this.connectionParameters.ssl || false + this._connectionTimeoutMillis = c.connectionTimeoutMillis || 0 } util.inherits(Client, EventEmitter) @@ -83,6 +85,14 @@ Client.prototype._connect = function (callback) { } this._connecting = true + var connectionTimeoutHandle + if (this._connectionTimeoutMillis > 0) { + connectionTimeoutHandle = setTimeout(() => { + con._ending = true + con.stream.destroy(new Error('timeout expired')) + }, this._connectionTimeoutMillis) + } + if (this.host && this.host.indexOf('/') === 0) { con.connect(this.host + '/.s.PGSQL.' + this.port) } else { @@ -159,6 +169,7 @@ Client.prototype._connect = function (callback) { return } this._connectionError = true + clearTimeout(connectionTimeoutHandle) if (callback) { return callback(err) } @@ -196,6 +207,7 @@ Client.prototype._connect = function (callback) { con.removeListener('errorMessage', connectingErrorHandler) con.on('error', connectedErrorHandler) con.on('errorMessage', connectedErrorMessageHandler) + clearTimeout(connectionTimeoutHandle) // process possible callback argument to Client#connect if (callback) { diff --git a/lib/connection-parameters.js b/lib/connection-parameters.js index 745311ad0..f45dc50a4 100644 --- a/lib/connection-parameters.js +++ b/lib/connection-parameters.js @@ -66,6 +66,22 @@ var ConnectionParameters = function (config) { this.fallback_application_name = val('fallback_application_name', config, false) this.statement_timeout = val('statement_timeout', config, false) this.query_timeout = val('query_timeout', config, false) + + if (config.connectionTimeoutMillis === undefined) { + this.connect_timeout = process.env.PGCONNECT_TIMEOUT || 0 + } else { + this.connect_timeout = Math.floor(config.connectionTimeoutMillis / 1000) + } + + if (config.keepAlive === false) { + this.keepalives = 0 + } else if (config.keepAlive === true) { + this.keepalives = 1 + } + + if (typeof config.keepAliveInitialDelayMillis === 'number') { + this.keepalives_idle = Math.floor(config.keepAliveInitialDelayMillis / 1000) + } } // Convert arg to a string, surround in single quotes, and escape single quotes and backslashes @@ -75,7 +91,7 @@ var quoteParamValue = function (value) { var add = function (params, config, paramName) { var value = config[paramName] - if (value) { + if (value !== undefined && value !== null) { params.push(paramName + '=' + quoteParamValue(value)) } } @@ -87,8 +103,9 @@ ConnectionParameters.prototype.getLibpqConnectionString = function (cb) { add(params, this, 'port') add(params, this, 'application_name') add(params, this, 'fallback_application_name') + add(params, this, 'connect_timeout') - var ssl = typeof this.ssl === 'object' ? this.ssl : { sslmode: this.ssl } + var ssl = typeof this.ssl === 'object' ? this.ssl : this.ssl ? { sslmode: this.ssl } : {} add(params, ssl, 'sslmode') add(params, ssl, 'sslca') add(params, ssl, 'sslkey') diff --git a/lib/connection.js b/lib/connection.js index 1f0af2f11..1ac2f668f 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -21,6 +21,7 @@ var Connection = function (config) { config = config || {} this.stream = config.stream || new net.Socket() this._keepAlive = config.keepAlive + this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis this.lastBuffer = false this.lastOffset = 0 this.buffer = null @@ -47,17 +48,17 @@ var Connection = function (config) { util.inherits(Connection, EventEmitter) Connection.prototype.connect = function (port, host) { + var self = this + if (this.stream.readyState === 'closed') { this.stream.connect(port, host) } else if (this.stream.readyState === 'open') { this.emit('connect') } - var self = this - this.stream.on('connect', function () { if (self._keepAlive) { - self.stream.setKeepAlive(true) + self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis) } self.emit('connect') }) diff --git a/lib/defaults.js b/lib/defaults.js index 6b0a98f1b..c520eb56b 100644 --- a/lib/defaults.js +++ b/lib/defaults.js @@ -58,7 +58,13 @@ module.exports = { statement_timeout: false, // max miliseconds to wait for query to complete (client side) - query_timeout: false + query_timeout: false, + + connect_timeout: 0, + + keepalives: 1, + + keepalives_idle: 0 } var pgTypes = require('pg-types') diff --git a/test/integration/client/connection-timeout-tests.js b/test/integration/client/connection-timeout-tests.js new file mode 100644 index 000000000..35e418858 --- /dev/null +++ b/test/integration/client/connection-timeout-tests.js @@ -0,0 +1,85 @@ +'use strict' +const net = require('net') +const buffers = require('../../test-buffers') +const helper = require('./test-helper') + +const suite = new helper.Suite() + +const options = { + host: 'localhost', + port: 54321, + connectionTimeoutMillis: 2000, + user: 'not', + database: 'existing' +} + +const serverWithConnectionTimeout = (timeout, callback) => { + const sockets = new Set() + + const server = net.createServer(socket => { + sockets.add(socket) + socket.once('end', () => sockets.delete(socket)) + + socket.on('data', data => { + // deny request for SSL + if (data.length === 8) { + socket.write(Buffer.from('N', 'utf8')) + // consider all authentication requests as good + } else if (!data[0]) { + socket.write(buffers.authenticationOk()) + // send ReadyForQuery `timeout` ms after authentication + setTimeout(() => socket.write(buffers.readyForQuery()), timeout).unref() + // respond with our canned response + } else { + socket.write(buffers.readyForQuery()) + } + }) + }) + + let closing = false + const closeServer = done => { + if (closing) return + closing = true + + server.close(done) + for (const socket of sockets) { + socket.destroy() + } + } + + server.listen(options.port, options.host, () => callback(closeServer)) +} + +suite.test('successful connection', done => { + serverWithConnectionTimeout(0, closeServer => { + const timeoutId = setTimeout(() => { + throw new Error('Client should have connected successfully but it did not.') + }, 3000) + + const client = new helper.Client(options) + client.connect() + .then(() => client.end()) + .then(() => closeServer(done)) + .catch(err => closeServer(() => done(err))) + .then(() => clearTimeout(timeoutId)) + }) +}) + +suite.test('expired connection timeout', done => { + serverWithConnectionTimeout(options.connectionTimeoutMillis * 2, closeServer => { + const timeoutId = setTimeout(() => { + throw new Error('Client should have emitted an error but it did not.') + }, 3000) + + const client = new helper.Client(options) + client.connect() + .then(() => client.end()) + .then(() => closeServer(() => done(new Error('Connection timeout should have expired but it did not.')))) + .catch(err => { + assert(err instanceof Error) + assert(/timeout expired\s*/.test(err.message)) + closeServer(done) + }) + .then(() => clearTimeout(timeoutId)) + }) +}) diff --git a/test/test-helper.js b/test/test-helper.js index 2f39be2aa..4c14b8578 100644 --- a/test/test-helper.js +++ b/test/test-helper.js @@ -134,7 +134,7 @@ var expect = function (callback, timeout) { assert.ok(executed, 'Expected execution of function to be fired within ' + timeout + ' milliseconds ' + - +' (hint: export TEST_TIMEOUT=' + + ' (hint: export TEST_TIMEOUT=' + ' to change timeout globally)' + callback.toString()) }, timeout) diff --git a/test/unit/client/set-keepalives.js b/test/unit/client/set-keepalives.js new file mode 100644 index 000000000..55ff04f39 --- /dev/null +++ b/test/unit/client/set-keepalives.js @@ -0,0 +1,32 @@ +'use strict' +const net = require('net') +const pg = require('../../../lib/index.js') +const helper = require('./test-helper') + +const suite = new helper.Suite() + +suite.test('setting keep alive', done => { + const server = net.createServer(c => { + c.destroy() + server.close() + }) + + server.listen(7777, () => { + const stream = new net.Socket() + stream.setKeepAlive = (enable, initialDelay) => { + assert(enable === true) + assert(initialDelay === 10000) + done() + } + + const client = new pg.Client({ + host: 'localhost', + port: 7777, + keepAlive: true, + keepAliveInitialDelayMillis: 10000, + stream + }) + + client.connect().catch(() => {}) + }) +})