Skip to content

Commit

Permalink
Added the missing connect_timeout and keepalives_idle config paramete…
Browse files Browse the repository at this point in the history
…rs (#1847)

* Added the missing connect_timeout and keepalives_idle config parameters

* Implementation and tests for keepAliveInitialDelayMillis and connectionTimeoutMillis [squashed 4]
  • Loading branch information
boromisp authored and brianc committed May 10, 2019
1 parent 4b530a9 commit 0993e4b
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 7 deletions.
12 changes: 12 additions & 0 deletions lib/client.js
Expand Up @@ -44,13 +44,15 @@ var Client = function (config) {
stream: c.stream,
ssl: this.connectionParameters.ssl,
keepAlive: c.keepAlive || false,
keepAliveInitialDelayMillis: c.keepAliveInitialDelayMillis || 0,
encoding: this.connectionParameters.client_encoding || 'utf8'
})
this.queryQueue = []
this.binary = c.binary || defaults.binary
this.processID = null
this.secretKey = null
this.ssl = this.connectionParameters.ssl || false
this._connectionTimeoutMillis = c.connectionTimeoutMillis || 0
}

util.inherits(Client, EventEmitter)
Expand Down Expand Up @@ -83,6 +85,14 @@ Client.prototype._connect = function (callback) {
}
this._connecting = true

var connectionTimeoutHandle
if (this._connectionTimeoutMillis > 0) {
connectionTimeoutHandle = setTimeout(() => {
con._ending = true
con.stream.destroy(new Error('timeout expired'))
}, this._connectionTimeoutMillis)
}

if (this.host && this.host.indexOf('/') === 0) {
con.connect(this.host + '/.s.PGSQL.' + this.port)
} else {
Expand Down Expand Up @@ -159,6 +169,7 @@ Client.prototype._connect = function (callback) {
return
}
this._connectionError = true
clearTimeout(connectionTimeoutHandle)
if (callback) {
return callback(err)
}
Expand Down Expand Up @@ -196,6 +207,7 @@ Client.prototype._connect = function (callback) {
con.removeListener('errorMessage', connectingErrorHandler)
con.on('error', connectedErrorHandler)
con.on('errorMessage', connectedErrorMessageHandler)
clearTimeout(connectionTimeoutHandle)

// process possible callback argument to Client#connect
if (callback) {
Expand Down
21 changes: 19 additions & 2 deletions lib/connection-parameters.js
Expand Up @@ -66,6 +66,22 @@ var ConnectionParameters = function (config) {
this.fallback_application_name = val('fallback_application_name', config, false)
this.statement_timeout = val('statement_timeout', config, false)
this.query_timeout = val('query_timeout', config, false)

if (config.connectionTimeoutMillis === undefined) {
this.connect_timeout = process.env.PGCONNECT_TIMEOUT || 0
} else {
this.connect_timeout = Math.floor(config.connectionTimeoutMillis / 1000)
}

if (config.keepAlive === false) {
this.keepalives = 0
} else if (config.keepAlive === true) {
this.keepalives = 1
}

if (typeof config.keepAliveInitialDelayMillis === 'number') {
this.keepalives_idle = Math.floor(config.keepAliveInitialDelayMillis / 1000)
}
}

// Convert arg to a string, surround in single quotes, and escape single quotes and backslashes
Expand All @@ -75,7 +91,7 @@ var quoteParamValue = function (value) {

var add = function (params, config, paramName) {
var value = config[paramName]
if (value) {
if (value !== undefined && value !== null) {
params.push(paramName + '=' + quoteParamValue(value))
}
}
Expand All @@ -87,8 +103,9 @@ ConnectionParameters.prototype.getLibpqConnectionString = function (cb) {
add(params, this, 'port')
add(params, this, 'application_name')
add(params, this, 'fallback_application_name')
add(params, this, 'connect_timeout')

var ssl = typeof this.ssl === 'object' ? this.ssl : { sslmode: this.ssl }
var ssl = typeof this.ssl === 'object' ? this.ssl : this.ssl ? { sslmode: this.ssl } : {}
add(params, ssl, 'sslmode')
add(params, ssl, 'sslca')
add(params, ssl, 'sslkey')
Expand Down
7 changes: 4 additions & 3 deletions lib/connection.js
Expand Up @@ -21,6 +21,7 @@ var Connection = function (config) {
config = config || {}
this.stream = config.stream || new net.Socket()
this._keepAlive = config.keepAlive
this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis
this.lastBuffer = false
this.lastOffset = 0
this.buffer = null
Expand All @@ -47,17 +48,17 @@ var Connection = function (config) {
util.inherits(Connection, EventEmitter)

Connection.prototype.connect = function (port, host) {
var self = this

if (this.stream.readyState === 'closed') {
this.stream.connect(port, host)
} else if (this.stream.readyState === 'open') {
this.emit('connect')
}

var self = this

this.stream.on('connect', function () {
if (self._keepAlive) {
self.stream.setKeepAlive(true)
self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis)
}
self.emit('connect')
})
Expand Down
8 changes: 7 additions & 1 deletion lib/defaults.js
Expand Up @@ -58,7 +58,13 @@ module.exports = {
statement_timeout: false,

// max miliseconds to wait for query to complete (client side)
query_timeout: false
query_timeout: false,

connect_timeout: 0,

keepalives: 1,

keepalives_idle: 0
}

var pgTypes = require('pg-types')
Expand Down
85 changes: 85 additions & 0 deletions test/integration/client/connection-timeout-tests.js
@@ -0,0 +1,85 @@
'use strict'
const net = require('net')
const buffers = require('../../test-buffers')
const helper = require('./test-helper')

const suite = new helper.Suite()

const options = {
host: 'localhost',
port: 54321,
connectionTimeoutMillis: 2000,
user: 'not',
database: 'existing'
}

const serverWithConnectionTimeout = (timeout, callback) => {
const sockets = new Set()

const server = net.createServer(socket => {
sockets.add(socket)
socket.once('end', () => sockets.delete(socket))

socket.on('data', data => {
// deny request for SSL
if (data.length === 8) {
socket.write(Buffer.from('N', 'utf8'))
// consider all authentication requests as good
} else if (!data[0]) {
socket.write(buffers.authenticationOk())
// send ReadyForQuery `timeout` ms after authentication
setTimeout(() => socket.write(buffers.readyForQuery()), timeout).unref()
// respond with our canned response
} else {
socket.write(buffers.readyForQuery())
}
})
})

let closing = false
const closeServer = done => {
if (closing) return
closing = true

server.close(done)
for (const socket of sockets) {
socket.destroy()
}
}

server.listen(options.port, options.host, () => callback(closeServer))
}

suite.test('successful connection', done => {
serverWithConnectionTimeout(0, closeServer => {
const timeoutId = setTimeout(() => {
throw new Error('Client should have connected successfully but it did not.')
}, 3000)

const client = new helper.Client(options)
client.connect()
.then(() => client.end())
.then(() => closeServer(done))
.catch(err => closeServer(() => done(err)))
.then(() => clearTimeout(timeoutId))
})
})

suite.test('expired connection timeout', done => {
serverWithConnectionTimeout(options.connectionTimeoutMillis * 2, closeServer => {
const timeoutId = setTimeout(() => {
throw new Error('Client should have emitted an error but it did not.')
}, 3000)

const client = new helper.Client(options)
client.connect()
.then(() => client.end())
.then(() => closeServer(() => done(new Error('Connection timeout should have expired but it did not.'))))
.catch(err => {
assert(err instanceof Error)
assert(/timeout expired\s*/.test(err.message))
closeServer(done)
})
.then(() => clearTimeout(timeoutId))
})
})
2 changes: 1 addition & 1 deletion test/test-helper.js
Expand Up @@ -134,7 +134,7 @@ var expect = function (callback, timeout) {
assert.ok(executed,
'Expected execution of function to be fired within ' + timeout +
' milliseconds ' +
+' (hint: export TEST_TIMEOUT=<timeout in milliseconds>' +
' (hint: export TEST_TIMEOUT=<timeout in milliseconds>' +
' to change timeout globally)' +
callback.toString())
}, timeout)
Expand Down
32 changes: 32 additions & 0 deletions test/unit/client/set-keepalives.js
@@ -0,0 +1,32 @@
'use strict'
const net = require('net')
const pg = require('../../../lib/index.js')
const helper = require('./test-helper')

const suite = new helper.Suite()

suite.test('setting keep alive', done => {
const server = net.createServer(c => {
c.destroy()
server.close()
})

server.listen(7777, () => {
const stream = new net.Socket()
stream.setKeepAlive = (enable, initialDelay) => {
assert(enable === true)
assert(initialDelay === 10000)
done()
}

const client = new pg.Client({
host: 'localhost',
port: 7777,
keepAlive: true,
keepAliveInitialDelayMillis: 10000,
stream
})

client.connect().catch(() => {})
})
})

0 comments on commit 0993e4b

Please sign in to comment.