Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queued query errors #1503

Merged
merged 11 commits into from Oct 3, 2018
Merged
125 changes: 93 additions & 32 deletions lib/client.js
Expand Up @@ -36,6 +36,7 @@ var Client = function (config) {
this._connecting = false
this._connected = false
this._connectionError = false
this._queryable = true

this.connection = c.connection || new Connection({
stream: c.stream,
Expand All @@ -52,16 +53,31 @@ var Client = function (config) {

util.inherits(Client, EventEmitter)

Client.prototype.connect = function (callback) {
Client.prototype._errorAllQueries = function (err) {
const enqueueError = (query) => {
process.nextTick(() => {
query.handleError(err, this.connection)
})
}

if (this.activeQuery) {
enqueueError(this.activeQuery)
this.activeQuery = null
}

this.queryQueue.forEach(enqueueError)
this.queryQueue.length = 0
}

Client.prototype._connect = function (callback) {
var self = this
var con = this.connection
if (this._connecting || this._connected) {
const err = new Error('Client has already been connected. You cannot reuse a client.')
if (callback) {
process.nextTick(() => {
callback(err)
return undefined
}
return Promise.reject(err)
})
return
}
this._connecting = true

Expand Down Expand Up @@ -126,15 +142,25 @@ Client.prototype.connect = function (callback) {
}

const connectedErrorHandler = (err) => {
if (this.activeQuery) {
var activeQuery = self.activeQuery
this.activeQuery = null
return activeQuery.handleError(err, con)
}
this._queryable = false
this._errorAllQueries(err)
this.emit('error', err)
}

const connectedErrorMessageHandler = (msg) => {
const activeQuery = this.activeQuery

if (!activeQuery) {
connectedErrorHandler(msg)
return
}

this.activeQuery = null
activeQuery.handleError(msg, con)
}

con.on('error', connectingErrorHandler)
con.on('errorMessage', connectingErrorHandler)

// hook up query handling events to connection
// after the connection initially becomes ready for queries
Expand All @@ -143,7 +169,9 @@ Client.prototype.connect = function (callback) {
self._connected = true
self._attachListeners(con)
con.removeListener('error', connectingErrorHandler)
con.removeListener('errorMessage', connectingErrorHandler)
con.on('error', connectedErrorHandler)
con.on('errorMessage', connectedErrorMessageHandler)

// process possible callback argument to Client#connect
if (callback) {
Expand All @@ -166,43 +194,53 @@ Client.prototype.connect = function (callback) {
})

con.once('end', () => {
if (this.activeQuery) {
var disconnectError = new Error('Connection terminated')
this.activeQuery.handleError(disconnectError, con)
this.activeQuery = null
}
const error = this._ending
? new Error('Connection terminated')
: new Error('Connection terminated unexpectedly')

this._errorAllQueries(error)

if (!this._ending) {
// if the connection is ended without us calling .end()
// on this client then we have an unexpected disconnection
// treat this as an error unless we've already emitted an error
// during connection.
const error = new Error('Connection terminated unexpectedly')
if (this._connecting && !this._connectionError) {
if (callback) {
callback(error)
} else {
this.emit('error', error)
connectedErrorHandler(error)
}
} else if (!this._connectionError) {
this.emit('error', error)
connectedErrorHandler(error)
}
}
this.emit('end')

process.nextTick(() => {
this.emit('end')
})
})

con.on('notice', function (msg) {
self.emit('notice', msg)
})
}

if (!callback) {
return new global.Promise((resolve, reject) => {
this.once('error', reject)
this.once('connect', () => {
this.removeListener('error', reject)
Client.prototype.connect = function (callback) {
if (callback) {
this._connect(callback)
return
}

return new Promise((resolve, reject) => {
this._connect((error) => {
if (error) {
reject(error)
} else {
resolve()
})
}
})
}
})
}

Client.prototype._attachListeners = function (con) {
Expand Down Expand Up @@ -353,7 +391,15 @@ Client.prototype._pulseQueryQueue = function () {
if (this.activeQuery) {
this.readyForQuery = false
this.hasExecuted = true
this.activeQuery.submit(this.connection)

const queryError = this.activeQuery.submit(this.connection)
if (queryError) {
process.nextTick(() => {
this.activeQuery.handleError(queryError, this.connection)
this.readyForQuery = true
this._pulseQueryQueue()
})
}
} else if (this.hasExecuted) {
this.activeQuery = null
this.emit('drain')
Expand Down Expand Up @@ -389,25 +435,40 @@ Client.prototype.query = function (config, values, callback) {
query._result._getTypeParser = this._types.getTypeParser.bind(this._types)
}

if (!this._queryable) {
process.nextTick(() => {
query.handleError(new Error('Client has encountered a connection error and is not queryable'), this.connection)
})
return result
}

if (this._ending) {
process.nextTick(() => {
query.handleError(new Error('Client was closed and is not queryable'), this.connection)
})
return result
}

this.queryQueue.push(query)
this._pulseQueryQueue()
return result
}

Client.prototype.end = function (cb) {
this._ending = true

if (this.activeQuery) {
// if we have an active query we need to force a disconnect
// on the socket - otherwise a hung query could block end forever
this.connection.stream.destroy(new Error('Connection terminated by user'))
return cb ? cb() : Promise.resolve()
this.connection.stream.destroy()
} else {
this.connection.end()
}

if (cb) {
this.connection.end()
this.connection.once('end', cb)
} else {
return new global.Promise((resolve, reject) => {
this.connection.end()
return new Promise((resolve) => {
this.connection.once('end', resolve)
})
}
Expand Down
3 changes: 2 additions & 1 deletion lib/connection.js
Expand Up @@ -117,10 +117,11 @@ Connection.prototype.attachListeners = function (stream) {
var packet = self._reader.read()
while (packet) {
var msg = self.parseMessage(packet)
var eventName = msg.name === 'error' ? 'errorMessage' : msg.name
if (self._emitMessage) {
self.emit('message', msg)
}
self.emit(msg.name, msg)
self.emit(eventName, msg)
packet = self._reader.read()
}
})
Expand Down