Skip to content

Commit

Permalink
feat(postgres): native upsert (#12301)
Browse files Browse the repository at this point in the history
  • Loading branch information
sushantdhiman committed May 24, 2020
1 parent 8244a34 commit 4d9165b
Show file tree
Hide file tree
Showing 20 changed files with 260 additions and 304 deletions.
1 change: 0 additions & 1 deletion lib/dialects/abstract/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ AbstractDialect.prototype.supports = {
'DEFAULT VALUES': false,
'VALUES ()': false,
'LIMIT ON UPDATE': false,
'ON DUPLICATE KEY': true,
'ORDER NULLS': false,
'UNION': true,
'UNION ALL': true,
Expand Down
42 changes: 20 additions & 22 deletions lib/dialects/abstract/query-generator.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
const util = require('util');
const _ = require('lodash');
const uuidv4 = require('uuid').v4;
const semver = require('semver');

const Utils = require('../../utils');
const deprecations = require('../../utils/deprecations');
Expand Down Expand Up @@ -179,6 +178,20 @@ class QueryGenerator {
}
}

let onDuplicateKeyUpdate = '';

if (this._dialect.supports.inserts.updateOnDuplicate && options.updateOnDuplicate) {
if (this._dialect.supports.inserts.updateOnDuplicate == ' ON CONFLICT DO UPDATE SET') { // postgres / sqlite
// If no conflict target columns were specified, use the primary key names from options.upsertKeys
const conflictKeys = options.upsertKeys.map(attr => this.quoteIdentifier(attr));
const updateKeys = options.updateOnDuplicate.map(attr => `${this.quoteIdentifier(attr)}=EXCLUDED.${this.quoteIdentifier(attr)}`);
onDuplicateKeyUpdate = ` ON CONFLICT (${conflictKeys.join(',')}) DO UPDATE SET ${updateKeys.join(',')}`;
} else {
const valueKeys = options.updateOnDuplicate.map(attr => `${this.quoteIdentifier(attr)}=VALUES(${this.quoteIdentifier(attr)})`);
onDuplicateKeyUpdate += `${this._dialect.supports.inserts.updateOnDuplicate} ${valueKeys.join(',')}`;
}
}

const replacements = {
ignoreDuplicates: options.ignoreDuplicates ? this._dialect.supports.inserts.ignoreDuplicates : '',
onConflictDoNothing: options.ignoreDuplicates ? this._dialect.supports.inserts.onConflictDoNothing : '',
Expand All @@ -188,8 +201,8 @@ class QueryGenerator {
tmpTable
};

valueQuery = `${tmpTable}INSERT${replacements.ignoreDuplicates} INTO ${quotedTable} (${replacements.attributes})${replacements.output} VALUES (${replacements.values})${replacements.onConflictDoNothing}${valueQuery}`;
emptyQuery = `${tmpTable}INSERT${replacements.ignoreDuplicates} INTO ${quotedTable}${replacements.output}${replacements.onConflictDoNothing}${emptyQuery}`;
valueQuery = `${tmpTable}INSERT${replacements.ignoreDuplicates} INTO ${quotedTable} (${replacements.attributes})${replacements.output} VALUES (${replacements.values})${onDuplicateKeyUpdate}${replacements.onConflictDoNothing}${valueQuery}`;
emptyQuery = `${tmpTable}INSERT${replacements.ignoreDuplicates} INTO ${quotedTable}${replacements.output}${onDuplicateKeyUpdate}${replacements.onConflictDoNothing}${emptyQuery}`;

// Mostly for internal use, so we expect the user to know what he's doing!
// pg_temp functions are private per connection, so we never risk this function interfering with another one.
Expand All @@ -200,31 +213,16 @@ class QueryGenerator {
returningModelAttributes.push('*');
}

if (semver.gte(this.sequelize.options.databaseVersion, '9.2.0')) {
// >= 9.2 - Use a UUID but prefix with 'func_' (numbers first not allowed)
const delimiter = `$func_${uuidv4().replace(/-/g, '')}$`;
const selectQuery = `SELECT (testfunc.response).${returningModelAttributes.join(', (testfunc.response).')}, testfunc.sequelize_caught_exception FROM pg_temp.testfunc();`;
const delimiter = `$func_${uuidv4().replace(/-/g, '')}$`;
const selectQuery = `SELECT (testfunc.response).${returningModelAttributes.join(', (testfunc.response).')}, testfunc.sequelize_caught_exception FROM pg_temp.testfunc();`;

options.exception = 'WHEN unique_violation THEN GET STACKED DIAGNOSTICS sequelize_caught_exception = PG_EXCEPTION_DETAIL;';
valueQuery = `CREATE OR REPLACE FUNCTION pg_temp.testfunc(OUT response ${quotedTable}, OUT sequelize_caught_exception text) RETURNS RECORD AS ${delimiter
} BEGIN ${valueQuery} RETURNING * INTO response; EXCEPTION ${options.exception} END ${delimiter} LANGUAGE plpgsql; ${selectQuery} ${dropFunction}`;
} else {
const selectQuery = `SELECT ${returningModelAttributes.join(', ')} FROM pg_temp.testfunc();`;

options.exception = 'WHEN unique_violation THEN NULL;';
valueQuery = `CREATE OR REPLACE FUNCTION pg_temp.testfunc() RETURNS SETOF ${quotedTable} AS $body$ BEGIN RETURN QUERY ${valueQuery
} RETURNING *; EXCEPTION ${options.exception} END; $body$ LANGUAGE plpgsql; ${selectQuery} ${dropFunction}`;
}
options.exception = 'WHEN unique_violation THEN GET STACKED DIAGNOSTICS sequelize_caught_exception = PG_EXCEPTION_DETAIL;';
valueQuery = `CREATE OR REPLACE FUNCTION pg_temp.testfunc(OUT response ${quotedTable}, OUT sequelize_caught_exception text) RETURNS RECORD AS ${delimiter} BEGIN ${valueQuery} RETURNING * INTO response; EXCEPTION ${options.exception} END ${delimiter} LANGUAGE plpgsql; ${selectQuery} ${dropFunction}`;
} else {
valueQuery += returningFragment;
emptyQuery += returningFragment;
}

if (this._dialect.supports['ON DUPLICATE KEY'] && options.onDuplicate) {
valueQuery += ` ON DUPLICATE KEY ${options.onDuplicate}`;
emptyQuery += ` ON DUPLICATE KEY ${options.onDuplicate}`;
}

query = `${replacements.attributes.length ? valueQuery : emptyQuery};`;
if (identityWrapperRequired && this._dialect.supports.autoIncrement.identityInsert) {
query = `SET IDENTITY_INSERT ${quotedTable} ON; ${query} SET IDENTITY_INSERT ${quotedTable} OFF;`;
Expand Down
84 changes: 31 additions & 53 deletions lib/dialects/abstract/query-interface.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ const Utils = require('../../utils');
const DataTypes = require('../../data-types');
const Transaction = require('../../transaction');
const QueryTypes = require('../../query-types');
const Op = require('../../operators');

/**
* The interface that Sequelize uses to talk to all databases
Expand Down Expand Up @@ -744,72 +743,51 @@ class QueryInterface {
* @param {string} tableName table to upsert on
* @param {object} insertValues values to be inserted, mapped to field name
* @param {object} updateValues values to be updated, mapped to field name
* @param {object} where various conditions
* @param {Model} model Model to upsert on
* @param {object} where where conditions, which can be used for UPDATE part when INSERT fails
* @param {object} options query options
*
* @returns {Promise<boolean,?number>} Resolves an array with <created, primaryKey>
*/
async upsert(tableName, insertValues, updateValues, where, model, options) {
const wheres = [];
const attributes = Object.keys(insertValues);
let indexes = [];
let indexFields;

async upsert(tableName, insertValues, updateValues, where, options) {
options = { ...options };

if (!Utils.isWhereEmpty(where)) {
wheres.push(where);
}
const model = options.model;
const primaryKeys = Object.values(model.primaryKeys).map(item => item.field);
const uniqueKeys = Object.values(model.uniqueKeys).filter(c => c.fields.length >= 1).map(c => c.fields);
const indexKeys = Object.values(model._indexes).filter(c => c.unique && c.fields.length >= 1).map(c => c.fields);

// Lets combine unique keys and indexes into one
indexes = _.map(model.uniqueKeys, value => {
return value.fields;
});

model._indexes.forEach(value => {
if (value.unique) {
// fields in the index may both the strings or objects with an attribute property - lets sanitize that
indexFields = value.fields.map(field => {
if (_.isPlainObject(field)) {
return field.attribute;
}
return field;
});
indexes.push(indexFields);
options.type = QueryTypes.UPSERT;
options.updateOnDuplicate = Object.keys(updateValues);
options.upsertKeys = [];

// For fields in updateValues, try to find a constraint or unique index
// that includes given field. Only first matching upsert key is used.
for (const field of options.updateOnDuplicate) {
const uniqueKey = uniqueKeys.find(fields => fields.includes(field));
if (uniqueKey) {
options.upsertKeys = uniqueKey;
break;
}
});

for (const index of indexes) {
if (_.intersection(attributes, index).length === index.length) {
where = {};
for (const field of index) {
where[field] = insertValues[field];
}
wheres.push(where);
const indexKey = indexKeys.find(fields => fields.includes(field));
if (indexKey) {
options.upsertKeys = indexKey;
break;
}
}

where = { [Op.or]: wheres };

options.type = QueryTypes.UPSERT;
options.raw = true;
// Always use PK, if no constraint available OR update data contains PK
if (
options.upsertKeys.length === 0
|| _.intersection(options.updateOnDuplicate, primaryKeys).length
) {
options.upsertKeys = primaryKeys;
}

const sql = this.queryGenerator.upsertQuery(tableName, insertValues, updateValues, where, model, options);
const result = await this.sequelize.query(sql, options);
return this._convertUpsertResult(result, model);
}
options.upsertKeys = _.uniq(options.upsertKeys);

/**
* Converts raw upsert result to API contract.
*
* @param {object} result
* @param {Model} model
* @protected
*/
// eslint-disable-next-line no-unused-vars
_convertUpsertResult(result, model) {
return [result, undefined];
const sql = this.queryGenerator.insertQuery(tableName, insertValues, model.rawAttributes, options);
return await this.sequelize.query(sql, options);
}

/**
Expand Down
7 changes: 5 additions & 2 deletions lib/dialects/mariadb/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,12 @@ class Query extends AbstractQuery {
formatResults(data) {
let result = this.instance;

if (this.isBulkUpdateQuery() || this.isBulkDeleteQuery()
|| this.isUpsertQuery()) {
if (this.isBulkUpdateQuery() || this.isBulkDeleteQuery()) {
return data.affectedRows;
}
if (this.isUpsertQuery()) {
return [null, data.affectedRows === 1];
}
if (this.isInsertQuery(data)) {
this.handleInsertQuery(data);

Expand All @@ -116,6 +118,7 @@ class Query extends AbstractQuery {
}
return [result, data.affectedRows];
}

return [data[this.getInsertIdField()], data.affectedRows];
}
}
Expand Down
42 changes: 37 additions & 5 deletions lib/dialects/mssql/query-interface.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
'use strict';

const _ = require('lodash');

const Utils = require('../../utils');
const QueryTypes = require('../../query-types');
const Op = require('../../operators');
const { QueryInterface } = require('../abstract/query-interface');

/**
Expand Down Expand Up @@ -42,11 +47,38 @@ class MSSqlQueryInterface extends QueryInterface {
/**
* @override
*/
_convertUpsertResult(result, model) {
return [
result.$action === 'INSERT',
result[model.primaryKeyField]
];
async upsert(tableName, insertValues, updateValues, where, options) {
const model = options.model;
const wheres = [];

options = { ...options };

if (!Utils.isWhereEmpty(where)) {
wheres.push(where);
}

// Lets combine unique keys and indexes into one
let indexes = Object.values(model.uniqueKeys).map(item => item.fields);
indexes = indexes.concat(Object.values(model._indexes).filter(item => item.unique).map(item => item.fields));

const attributes = Object.keys(insertValues);
for (const index of indexes) {
if (_.intersection(attributes, index).length === index.length) {
where = {};
for (const field of index) {
where[field] = insertValues[field];
}
wheres.push(where);
}
}

where = { [Op.or]: wheres };

options.type = QueryTypes.UPSERT;
options.raw = true;

const sql = this.queryGenerator.upsertQuery(tableName, insertValues, updateValues, where, model, options);
return await this.sequelize.query(sql, options);
}
}

Expand Down
6 changes: 3 additions & 3 deletions lib/dialects/mssql/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,6 @@ class Query extends AbstractQuery {
if (this.isShowIndexesQuery()) {
return this.handleShowIndexesQuery(data);
}
if (this.isUpsertQuery()) {
return data[0];
}
if (this.isCallQuery()) {
return data[0];
}
Expand All @@ -224,6 +221,9 @@ class Query extends AbstractQuery {
if (this.isForeignKeysQuery()) {
return data;
}
if (this.isUpsertQuery()) {
return [result, data[0].$action === 'INSERT'];
}
if (this.isInsertQuery() || this.isUpdateQuery()) {
return [result, rowCount];
}
Expand Down
11 changes: 0 additions & 11 deletions lib/dialects/mysql/query-generator.js
Original file line number Diff line number Diff line change
Expand Up @@ -293,17 +293,6 @@ class MySQLQueryGenerator extends AbstractQueryGenerator {
return value;
}

upsertQuery(tableName, insertValues, updateValues, where, model, options) {
options.onDuplicate = 'UPDATE ';

options.onDuplicate += Object.keys(updateValues).map(key => {
key = this.quoteIdentifier(key);
return `${key}=VALUES(${key})`;
}).join(', ');

return this.insertQuery(tableName, insertValues, model.rawAttributes, options);
}

truncateTableQuery(tableName) {
return `TRUNCATE ${this.quoteTable(tableName)}`;
}
Expand Down
22 changes: 15 additions & 7 deletions lib/dialects/mysql/query-interface.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const sequelizeErrors = require('../../errors');
const { QueryInterface } = require('../abstract/query-interface');
const QueryTypes = require('../../query-types');

/**
* The interface that Sequelize uses to talk with MySQL/MariaDB database
Expand Down Expand Up @@ -37,6 +38,20 @@ class MySQLQueryInterface extends QueryInterface {
);
}

/**
* @override
*/
async upsert(tableName, insertValues, updateValues, where, options) {
options = { ...options };

options.type = QueryTypes.UPSERT;
options.updateOnDuplicate = Object.keys(updateValues);

const model = options.model;
const sql = this.queryGenerator.insertQuery(tableName, insertValues, model.rawAttributes, options);
return await this.sequelize.query(sql, options);
}

/**
* @override
*/
Expand Down Expand Up @@ -69,13 +84,6 @@ class MySQLQueryInterface extends QueryInterface {

return await this.sequelize.query(query, options);
}

/**
* @override
*/
_convertUpsertResult(result) {
return [result === 1, undefined];
}
}

exports.MySQLQueryInterface = MySQLQueryInterface;
5 changes: 4 additions & 1 deletion lib/dialects/mysql/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class Query extends AbstractQuery {
if (this.isCallQuery()) {
return data[0];
}
if (this.isBulkUpdateQuery() || this.isBulkDeleteQuery() || this.isUpsertQuery()) {
if (this.isBulkUpdateQuery() || this.isBulkDeleteQuery()) {
return data.affectedRows;
}
if (this.isVersionQuery()) {
Expand All @@ -147,6 +147,9 @@ class Query extends AbstractQuery {
if (this.isForeignKeysQuery()) {
return data;
}
if (this.isUpsertQuery()) {
return [result, data.affectedRows === 1];
}
if (this.isInsertQuery() || this.isUpdateQuery()) {
return [result, data.affectedRows];
}
Expand Down

0 comments on commit 4d9165b

Please sign in to comment.