From 5bc014625a4a5372eb4a7c7e90babb90c1b47d5d Mon Sep 17 00:00:00 2001 From: Bluenix Date: Thu, 27 May 2021 18:31:30 +0200 Subject: [PATCH 1/6] Turn Cursor into an ES6 class --- packages/pg-cursor/index.js | 383 ++++++++++++++++++------------------ 1 file changed, 192 insertions(+), 191 deletions(-) diff --git a/packages/pg-cursor/index.js b/packages/pg-cursor/index.js index ca86c9e45..0b5b1e0da 100644 --- a/packages/pg-cursor/index.js +++ b/packages/pg-cursor/index.js @@ -6,230 +6,231 @@ const util = require('util') let nextUniqueID = 1 // concept borrowed from org.postgresql.core.v3.QueryExecutorImpl -function Cursor(text, values, config) { - EventEmitter.call(this) - - this._conf = config || {} - this.text = text - this.values = values ? values.map(prepare) : null - this.connection = null - this._queue = [] - this.state = 'initialized' - this._result = new Result(this._conf.rowMode, this._conf.types) - this._cb = null - this._rows = null - this._portal = null - this._ifNoData = this._ifNoData.bind(this) - this._rowDescription = this._rowDescription.bind(this) -} - -util.inherits(Cursor, EventEmitter) - -Cursor.prototype._ifNoData = function () { - this.state = 'idle' - this._shiftQueue() - if (this.connection) { - this.connection.removeListener('rowDescription', this._rowDescription) +class Cursor extends EventEmitter { + + constructor(text, values, config) { + super() + + this._conf = config || {} + this.text = text + this.values = values ? values.map(prepare) : null + this.connection = null + this._queue = [] + this.state = 'initialized' + this._result = new Result(this._conf.rowMode, this._conf.types) + this._cb = null + this._rows = null + this._portal = null + this._ifNoData = this._ifNoData.bind(this) + this._rowDescription = this._rowDescription.bind(this) } -} -Cursor.prototype._rowDescription = function () { - if (this.connection) { - this.connection.removeListener('noData', this._ifNoData) + _ifNoData() { + this.state = 'idle' + this._shiftQueue() + if (this.connection) { + this.connection.removeListener('rowDescription', this._rowDescription) + } } -} - -Cursor.prototype.submit = function (connection) { - this.state = 'submitted' - this.connection = connection - this._portal = 'C_' + nextUniqueID++ - - const con = connection - con.parse( - { - text: this.text, - }, - true - ) - - con.bind( - { - portal: this._portal, - values: this.values, - }, - true - ) - - con.describe( - { - type: 'P', - name: this._portal, // AWS Redshift requires a portal name - }, - true - ) - - con.flush() - - if (this._conf.types) { - this._result._getTypeParser = this._conf.types.getTypeParser + _rowDescription() { + if (this.connection) { + this.connection.removeListener('noData', this._ifNoData) + } } - con.once('noData', this._ifNoData) - con.once('rowDescription', this._rowDescription) -} + submit(connection) { + this.state = 'submitted' + this.connection = connection + this._portal = 'C_' + nextUniqueID++ + + const con = connection + + con.parse( + { + text: this.text, + }, + true + ) + + con.bind( + { + portal: this._portal, + values: this.values, + }, + true + ) + + con.describe( + { + type: 'P', + name: this._portal, // AWS Redshift requires a portal name + }, + true + ) + + con.flush() + + if (this._conf.types) { + this._result._getTypeParser = this._conf.types.getTypeParser + } -Cursor.prototype._shiftQueue = function () { - if (this._queue.length) { - this._getRows.apply(this, this._queue.shift()) + con.once('noData', this._ifNoData) + con.once('rowDescription', this._rowDescription) } -} - -Cursor.prototype._closePortal = function () { - // because we opened a named portal to stream results - // we need to close the same named portal. Leaving a named portal - // open can lock tables for modification if inside a transaction. - // see https://github.com/brianc/node-pg-cursor/issues/56 - this.connection.close({ type: 'P', name: this._portal }) - // If we've received an error we already sent a sync message. - // do not send another sync as it triggers another readyForQuery message. - if (this.state !== 'error') { - this.connection.sync() + _shiftQueue() { + if (this._queue.length) { + this._getRows.apply(this, this._queue.shift()) + } } -} -Cursor.prototype.handleRowDescription = function (msg) { - this._result.addFields(msg.fields) - this.state = 'idle' - this._shiftQueue() -} - -Cursor.prototype.handleDataRow = function (msg) { - const row = this._result.parseRow(msg.fields) - this.emit('row', row, this._result) - this._rows.push(row) -} + _closePortal() { + // because we opened a named portal to stream results + // we need to close the same named portal. Leaving a named portal + // open can lock tables for modification if inside a transaction. + // see https://github.com/brianc/node-pg-cursor/issues/56 + this.connection.close({ type: 'P', name: this._portal }) -Cursor.prototype._sendRows = function () { - this.state = 'idle' - setImmediate(() => { - const cb = this._cb - // remove callback before calling it - // because likely a new one will be added - // within the call to this callback - this._cb = null - if (cb) { - this._result.rows = this._rows - cb(null, this._rows, this._result) + // If we've received an error we already sent a sync message. + // do not send another sync as it triggers another readyForQuery message. + if (this.state !== 'error') { + this.connection.sync() } - this._rows = [] - }) -} - -Cursor.prototype.handleCommandComplete = function (msg) { - this._result.addCommandComplete(msg) - this._closePortal() -} - -Cursor.prototype.handlePortalSuspended = function () { - this._sendRows() -} - -Cursor.prototype.handleReadyForQuery = function () { - this._sendRows() - this.state = 'done' - this.emit('end', this._result) -} + } -Cursor.prototype.handleEmptyQuery = function () { - this.connection.sync() -} + handleRowDescription(msg) { + this._result.addFields(msg.fields) + this.state = 'idle' + this._shiftQueue() + } -Cursor.prototype.handleError = function (msg) { - // If we're in an initialized state we've never been submitted - // and don't have a connection instance reference yet. - // This can happen if you queue a stream and close the client before - // the client has submitted the stream. In this scenario we don't have - // a connection so there's nothing to unsubscribe from. - if (this.state !== 'initialized') { - this.connection.removeListener('noData', this._ifNoData) - this.connection.removeListener('rowDescription', this._rowDescription) - // call sync to trigger a readyForQuery - this.connection.sync() + handleDataRow(msg) { + const row = this._result.parseRow(msg.fields) + this.emit('row', row, this._result) + this._rows.push(row) } - this.state = 'error' - this._error = msg - // satisfy any waiting callback - if (this._cb) { - this._cb(msg) + _sendRows() { + this.state = 'idle' + setImmediate(() => { + const cb = this._cb + // remove callback before calling it + // because likely a new one will be added + // within the call to this callback + this._cb = null + if (cb) { + this._result.rows = this._rows + cb(null, this._rows, this._result) + } + this._rows = [] + }) } - // dispatch error to all waiting callbacks - for (let i = 0; i < this._queue.length; i++) { - this._queue.pop()[1](msg) + + handleCommandComplete(msg) { + this._result.addCommandComplete(msg) + this._closePortal() } - if (this.listenerCount('error') > 0) { - // only dispatch error events if we have a listener - this.emit('error', msg) + handlePortalSuspended() { + this._sendRows() } -} -Cursor.prototype._getRows = function (rows, cb) { - this.state = 'busy' - this._cb = cb - this._rows = [] - const msg = { - portal: this._portal, - rows: rows, + handleReadyForQuery() { + this._sendRows() + this.state = 'done' + this.emit('end', this._result) } - this.connection.execute(msg, true) - this.connection.flush() -} -// users really shouldn't be calling 'end' here and terminating a connection to postgres -// via the low level connection.end api -Cursor.prototype.end = util.deprecate(function (cb) { - if (this.state !== 'initialized') { + handleEmptyQuery() { this.connection.sync() } - this.connection.once('end', cb) - this.connection.end() -}, 'Cursor.end is deprecated. Call end on the client itself to end a connection to the database.') -Cursor.prototype.close = function (cb) { - if (!this.connection || this.state === 'done') { - if (cb) { - return setImmediate(cb) - } else { - return + handleError(msg) { + // If we're in an initialized state we've never been submitted + // and don't have a connection instance reference yet. + // This can happen if you queue a stream and close the client before + // the client has submitted the stream. In this scenario we don't have + // a connection so there's nothing to unsubscribe from. + if (this.state !== 'initialized') { + this.connection.removeListener('noData', this._ifNoData) + this.connection.removeListener('rowDescription', this._rowDescription) + // call sync to trigger a readyForQuery + this.connection.sync() } - } - this._closePortal() - this.state = 'done' - if (cb) { - this.connection.once('readyForQuery', function () { - cb() - }) - } -} + this.state = 'error' + this._error = msg + // satisfy any waiting callback + if (this._cb) { + this._cb(msg) + } + // dispatch error to all waiting callbacks + for (let i = 0; i < this._queue.length; i++) { + this._queue.pop()[1](msg) + } -Cursor.prototype.read = function (rows, cb) { - if (this.state === 'idle' || this.state === 'submitted') { - return this._getRows(rows, cb) + if (this.listenerCount('error') > 0) { + // only dispatch error events if we have a listener + this.emit('error', msg) + } } - if (this.state === 'busy' || this.state === 'initialized') { - return this._queue.push([rows, cb]) + + _getRows(rows, cb) { + this.state = 'busy' + this._cb = cb + this._rows = [] + const msg = { + portal: this._portal, + rows: rows, + } + this.connection.execute(msg, true) + this.connection.flush() } - if (this.state === 'error') { - return setImmediate(() => cb(this._error)) + + // users really shouldn't be calling 'end' here and terminating a connection to postgres + // via the low level connection.end api + end = util.deprecate(function (cb) { + if (this.state !== 'initialized') { + this.connection.sync() + } + this.connection.once('end', cb) + this.connection.end() + }, 'Cursor.end is deprecated. Call end on the client itself to end a connection to the database.') + + close(cb) { + if (!this.connection || this.state === 'done') { + if (cb) { + return setImmediate(cb) + } else { + return + } + } + + this._closePortal() + this.state = 'done' + if (cb) { + this.connection.once('readyForQuery', function () { + cb() + }) + } } - if (this.state === 'done') { - return setImmediate(() => cb(null, [])) - } else { - throw new Error('Unknown state: ' + this.state) + + read(rows, cb) { + if (this.state === 'idle' || this.state === 'submitted') { + return this._getRows(rows, cb) + } + if (this.state === 'busy' || this.state === 'initialized') { + return this._queue.push([rows, cb]) + } + if (this.state === 'error') { + return setImmediate(() => cb(this._error)) + } + if (this.state === 'done') { + return setImmediate(() => cb(null, [])) + } else { + throw new Error('Unknown state: ' + this.state) + } } } From a46726c4f999a396a5c3aac62edf0a4762484dac Mon Sep 17 00:00:00 2001 From: Bluenix Date: Fri, 28 May 2021 18:21:28 +0200 Subject: [PATCH 2/6] Fix incorrect syntax in Cursor.end() --- packages/pg-cursor/index.js | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/packages/pg-cursor/index.js b/packages/pg-cursor/index.js index 0b5b1e0da..e9fe6bee9 100644 --- a/packages/pg-cursor/index.js +++ b/packages/pg-cursor/index.js @@ -190,13 +190,15 @@ class Cursor extends EventEmitter { // users really shouldn't be calling 'end' here and terminating a connection to postgres // via the low level connection.end api - end = util.deprecate(function (cb) { - if (this.state !== 'initialized') { - this.connection.sync() - } - this.connection.once('end', cb) - this.connection.end() - }, 'Cursor.end is deprecated. Call end on the client itself to end a connection to the database.') + end(cb) { + return util.deprecate(function () { + if (this.state !== 'initialized') { + this.connection.sync() + } + this.connection.once('end', cb) + this.connection.end() + }, 'Cursor.end is deprecated. Call end on the client itself to end a connection to the database.')() + } close(cb) { if (!this.connection || this.state === 'done') { From 3da324d823134e8c5409490b696857d700fd3b54 Mon Sep 17 00:00:00 2001 From: Bluenix Date: Fri, 28 May 2021 18:35:18 +0200 Subject: [PATCH 3/6] Remove extraneous empty line --- packages/pg-cursor/index.js | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/pg-cursor/index.js b/packages/pg-cursor/index.js index e9fe6bee9..a1b7efff3 100644 --- a/packages/pg-cursor/index.js +++ b/packages/pg-cursor/index.js @@ -7,7 +7,6 @@ const util = require('util') let nextUniqueID = 1 // concept borrowed from org.postgresql.core.v3.QueryExecutorImpl class Cursor extends EventEmitter { - constructor(text, values, config) { super() From 5173fed805c20fe7aa7b94cd2e40c03b0d9574e5 Mon Sep 17 00:00:00 2001 From: Bluenix Date: Sat, 29 May 2021 19:22:45 +0200 Subject: [PATCH 4/6] Revert es6 change for end() --- packages/pg-cursor/index.js | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/packages/pg-cursor/index.js b/packages/pg-cursor/index.js index a1b7efff3..eee6413b8 100644 --- a/packages/pg-cursor/index.js +++ b/packages/pg-cursor/index.js @@ -187,18 +187,6 @@ class Cursor extends EventEmitter { this.connection.flush() } - // users really shouldn't be calling 'end' here and terminating a connection to postgres - // via the low level connection.end api - end(cb) { - return util.deprecate(function () { - if (this.state !== 'initialized') { - this.connection.sync() - } - this.connection.once('end', cb) - this.connection.end() - }, 'Cursor.end is deprecated. Call end on the client itself to end a connection to the database.')() - } - close(cb) { if (!this.connection || this.state === 'done') { if (cb) { @@ -235,4 +223,15 @@ class Cursor extends EventEmitter { } } +// we can't correctly use util.deprecate in an ES6 method +Cursor.prototype.end = util.deprecate(function (cb) { + // users really shouldn't be calling 'end' here and terminating a connection to postgres + // via the low level connection.end api + if (this.state !== 'initialized') { + this.connection.sync() + } + this.connection.once('end', cb) + this.connection.end() +}, 'Cursor.end is deprecated. Call end on the client itself to end a connection to the database.') + module.exports = Cursor From f47c3d1c3bb0c2fd0bc8be419f40fb4b1b9c7f15 Mon Sep 17 00:00:00 2001 From: Bluenix Date: Mon, 31 May 2021 20:26:13 +0200 Subject: [PATCH 5/6] Revert back to defining the end() method inside the class --- packages/pg-cursor/index.js | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/packages/pg-cursor/index.js b/packages/pg-cursor/index.js index eee6413b8..3c3b48073 100644 --- a/packages/pg-cursor/index.js +++ b/packages/pg-cursor/index.js @@ -187,6 +187,16 @@ class Cursor extends EventEmitter { this.connection.flush() } + // users really shouldn't be calling 'end' here and terminating a connection to postgres + // via the low level connection.end api + end(cb) { + if (this.state !== 'initialized') { + this.connection.sync() + } + this.connection.once('end', cb) + this.connection.end() + } + close(cb) { if (!this.connection || this.state === 'done') { if (cb) { @@ -223,15 +233,6 @@ class Cursor extends EventEmitter { } } -// we can't correctly use util.deprecate in an ES6 method -Cursor.prototype.end = util.deprecate(function (cb) { - // users really shouldn't be calling 'end' here and terminating a connection to postgres - // via the low level connection.end api - if (this.state !== 'initialized') { - this.connection.sync() - } - this.connection.once('end', cb) - this.connection.end() -}, 'Cursor.end is deprecated. Call end on the client itself to end a connection to the database.') +Cursor.prototype.end = util.deprecate(Cursor.prototype.end, 'Cursor.end is deprecated. Call end on the client itself to end a connection to the database.') module.exports = Cursor From beb12b47142010d611e7b840bf7b9187bdb5237b Mon Sep 17 00:00:00 2001 From: Bluenix Date: Mon, 14 Jun 2021 00:26:46 +0200 Subject: [PATCH 6/6] Use hanging indent to satisfy Prettier --- packages/pg-cursor/index.js | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/pg-cursor/index.js b/packages/pg-cursor/index.js index 3c3b48073..8e8552be8 100644 --- a/packages/pg-cursor/index.js +++ b/packages/pg-cursor/index.js @@ -233,6 +233,9 @@ class Cursor extends EventEmitter { } } -Cursor.prototype.end = util.deprecate(Cursor.prototype.end, 'Cursor.end is deprecated. Call end on the client itself to end a connection to the database.') +Cursor.prototype.end = util.deprecate( + Cursor.prototype.end, + 'Cursor.end is deprecated. Call end on the client itself to end a connection to the database.' +) module.exports = Cursor