Skip to content

Commit

Permalink
Use separate PostgreSQL schemas for sprocs (#4412)
Browse files Browse the repository at this point in the history
* Add schema support to sql-db

* Add random schema setting

* Fix function definitions

* Fix substring() name

* Change replaceAll() -> replace()

* Fix module.export(s)

* Change to underscores for schema name

* Actually create the new schema

* Fix an actual bug found by the linter!

* Only create schema if it doesn't exist

* Use client.escapeIdentifier() for schema

* Document use of done()

* Document length limit of schema prefix

* Fix setting with null schema

* Fix schema prefix truncation

* Fix to create non-null schema before using in search_path

* Shift new functions to end-of-file

* Force schema name to lowercase for convenience with psql

* Fix sprocs/array_and_number.sql to not check EXISTS

* Rewrite query() to use queryWithClient()

* Add docs and comments about our schema system

* Clarify "local" in the docs

Co-authored-by: Nathan Walters <nathan@prairielearn.com>

* Rewrite type creation comment

Co-authored-by: Nathan Walters <nathan@prairielearn.com>

* More comment updates about type creation

* Change all sprocs to plain CREATE

* Fix tests to actually use setRandomSearchSchema()

* Remove sprocs/random_string.sql because migrations make it in public schema

* Use more character types in setRandomSearchSchema()

Co-authored-by: Nathan Walters <nathan@prairielearn.com>
  • Loading branch information
mwest1066 and nwalters512 committed Jul 7, 2021
1 parent 27e279f commit d5c37b3
Show file tree
Hide file tree
Showing 158 changed files with 304 additions and 422 deletions.
27 changes: 27 additions & 0 deletions docs/dev-guide.md
Expand Up @@ -275,6 +275,33 @@ FROM
```


## DB stored procedures (sprocs)

* Stored procedures are created by the files in `sprocs/`. To call a stored procedure from JavaScript, use code like:

```
const workspace_id = 1342;
const message = 'Startup successful';
sqldb.call('workspaces_message_update', [workspace_id, message], (err, result) => {
if (ERR(err, callback)) return;
// we could use the result here if we want the return value of the stored procedure
callback(null);
});
```

* The stored procedures are all contained in a separate [database schema](https://www.postgresql.org/docs/12/ddl-schemas.html) with a name like `server_2021-07-07T20:25:04.779Z_T75V6Y`. To see a list of the schemas use the `\dn` command in `psql`.

* To be able to use the stored procedures from the `psql` command line it is necessary to get the most recent schema name using `\dn` and set the `search_path` to use this _quoted_ schema name and the `public` schema:

```
set search_path to "server_2021-07-07T20:25:04.779Z_T75V6Y",public;
```

* During startup we initially have no non-public schema in use. We first run the migrations to update all tables in the `public` schema, then we call `sqldb.setRandomSearchSchema()` to activate a random per-execution schema, and we run the sproc creation code to generate all the stored procedures in this schema. This means that every invocation of PrairieLearn will have its own locally-scoped copy of the stored procedures which are the correct versions for its code. This lets us upgrade PrairieLearn servers one at a time, while old servers are still running with their own copies of their sprocs. When PrairieLearn first starts up it has `search_path = public`, but later it will have `search_path = "server_2021-07-07T20:25:04.779Z_T75V6Y",public` so that it will first search the random schema and then fall back to `public`. The naming convention for the random schema uses the local instance name, the date, and a random string. Note that schema names need to be quoted using double-quotations in `psql` because they contain characters such as hyphens.

* For more details see `sprocs/array_and_number.sql` and comments in `server.js` near the call to `sqldb.setRandomSearchSchema()`.


## DB schema (simplified overview)

* The most important tables in the database are shown in the diagram below (also as a [PDF image](simplified-models.pdf)).
Expand Down
108 changes: 82 additions & 26 deletions prairielib/lib/sql-db.js
Expand Up @@ -7,6 +7,8 @@ const { promisify } = require('util');

const error = require('./error');

let searchSchema = null;

/**
* Formats a string for debugging.
*
Expand Down Expand Up @@ -165,7 +167,11 @@ module.exports.close = function(callback) {
module.exports.closeAsync = promisify(module.exports.close);

/**
* Gets a new client from the connection pool.
* Gets a new client from the connection pool. If `err` is not null
* then `client` and `done` are undefined. If `err` is null then
* `client` is valid and can be used. The caller MUST call
* `done(client)` to release the client, whether or not errors occured
* while using `client`.
*
* @param {(error: Error | null, client: import("pg").PoolClient, done: (release?: any) => void) => void} callback
*/
Expand All @@ -180,7 +186,18 @@ module.exports.getClient = function(callback) {
}
return ERR(err, callback); // unconditionally return
}
callback(null, client, done);
if (searchSchema != null) {
const setSearchPathSql = `SET search_path TO ${client.escapeIdentifier(searchSchema)},public`;
module.exports.queryWithClient(client, setSearchPathSql, {}, (err) => {
if (err) {
done(client);
return ERR(err, callback); // unconditionally return
}
callback(null, client, done);
});
} else {
callback(null, client, done);
}
});
};

Expand Down Expand Up @@ -403,31 +420,12 @@ module.exports.endTransactionAsync = promisify(module.exports.endTransaction);
module.exports.query = function(sql, params, callback) {
debug('query()', 'sql:', debugString(sql));
debug('query()', 'params:', debugParams(params));
if (!pool) {
return callback(new Error('Connection pool is not open'));
}
pool.connect(function(err, client, done) {
const handleError = function(err) {
if (!err) return false;
if (client) {
done(client);
}
const sqlError = JSON.parse(JSON.stringify(err));
sqlError.message = err.message;
err = error.addData(err, {sqlError: sqlError, sql: sql, sqlParams: params});
ERR(err, callback);
return true;
};
if (handleError(err)) return;
paramsToArray(sql, params, function(err, newSql, newParams) {
if (err) err = error.addData(err, {sql: sql, sqlParams: params});
module.exports.getClient((err, client, done) => {
if (ERR(err, callback)) return;
module.exports.queryWithClient(client, sql, params, (err, result) => {
done(client);
if (ERR(err, callback)) return;
client.query(newSql, newParams, function(err, result) {
if (handleError(err)) return;
done();
debug('query() success', 'rowCount:', result.rowCount);
callback(null, result);
});
callback(null, result);
});
});
};
Expand Down Expand Up @@ -659,3 +657,61 @@ module.exports.callWithClientZeroOrOneRow = function(client, functionName, param
* Errors if the function returns more than one row.
*/
module.exports.callWithClientZeroOrOneRowAsync = promisify(module.exports.callWithClientZeroOrOneRow);

/**
* Set the schema to use for the search path.
*
* @param {string} schema - The schema name to use (can be "null" to unset the search path)
* @param {(error: Error | null) => void} callback
*/
module.exports.setSearchSchema = function(schema, callback) {
if (schema == null) {
searchSchema = schema;
return;
}
/* Note that as of 2021-06-29 escapeIdentifier() is undocumented. See:
* https://github.com/brianc/node-postgres/pull/396
* https://github.com/brianc/node-postgres/issues/1978
* https://www.postgresql.org/docs/12/sql-syntax-lexical.html
*/
module.exports.query(`CREATE SCHEMA IF NOT EXISTS ${pg.Client.prototype.escapeIdentifier(schema)}`, [], (err) => {
if (ERR(err, callback)) return;
// we only set searchSchema after CREATE to avoid the above query() call using searchSchema
searchSchema = schema;
callback(null);
});
};

/**
* Get the schema that is currently used for the search path.
*
* @return {string} schema in use (may be "null" to indicate no schema)
*/
module.exports.getSearchSchema = function() {
return searchSchema;
};

/**
* Generate, set, and return a random schema name.
*
* @param {string} prefix - The prefix of the new schema, only the first 28 characters will be used (after lowercasing).
* @param {(error: Error | null, schema: String) => void} callback
*/
module.exports.setRandomSearchSchema = function(prefix, callback) {
// truncated prefix (max 28 characters)
const truncPrefix = prefix.substring(0, 28);
// timestamp in format YYYY-MM-DDTHH:MM:SS.SSSZ (guaranteed to not exceed 27 characters in the spec)
const timestamp = (new Date()).toISOString();
// random 6-character suffix to avoid clashes (approx 2 billion possible values)
const chars = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ'.split('');
const suffix = _.times(6, function() {return _.sample(chars);}).join('');

// Schema is guaranteed to have length at most 63 (= 28 + 1 + 27 + 1 + 6),
// which is the default PostgreSQL identifier limit.
// Note that this schema name will need quoting because of characters like ':', '-', etc
const schema = `${truncPrefix}_${timestamp}_${suffix}`;
module.exports.setSearchSchema(schema, (err) => {
if (ERR(err, callback)) return;
callback(null, schema);
});
};
15 changes: 15 additions & 0 deletions server.js
Expand Up @@ -1136,6 +1136,21 @@ if (config.startServer) {
callback(null);
});
},
function(callback) {
// We create and activate a random DB schema name
// (https://www.postgresql.org/docs/12/ddl-schemas.html)
// after we have run the migrations but before we create
// the sprocs. This means all tables (from migrations) are
// in the public schema, but all sprocs are in the random
// schema. Every server invocation thus has its own copy
// of its sprocs, allowing us to update servers while old
// servers are still running. See docs/dev-guide.md for
// more info.
sqldb.setRandomSearchSchema(config.instanceId, (err) => {
if (ERR(err, callback)) return;
callback(null);
});
},
function(callback) {
sprocs.init(function(err) {
if (ERR(err, callback)) return;
Expand Down
2 changes: 1 addition & 1 deletion sprocs/access_tokens_delete.sql
@@ -1,4 +1,4 @@
CREATE OR REPLACE FUNCTION
CREATE FUNCTION
access_tokens_delete(
id bigint,
user_id bigint
Expand Down
2 changes: 1 addition & 1 deletion sprocs/access_tokens_insert.sql
@@ -1,4 +1,4 @@
CREATE OR REPLACE FUNCTION
CREATE FUNCTION
access_tokens_insert(
user_id bigint,
name text,
Expand Down
2 changes: 1 addition & 1 deletion sprocs/admin_assessment_question_number.sql
@@ -1,4 +1,4 @@
CREATE OR REPLACE FUNCTION
CREATE FUNCTION
admin_assessment_question_number (
assessment_question_id bigint
) RETURNS text
Expand Down
2 changes: 1 addition & 1 deletion sprocs/administrators_delete_by_user_id.sql
@@ -1,4 +1,4 @@
CREATE OR REPLACE FUNCTION
CREATE FUNCTION
administrators_delete_by_user_id(
user_id bigint,
authn_user_id bigint
Expand Down
2 changes: 1 addition & 1 deletion sprocs/administrators_insert_by_user_uid.sql
@@ -1,4 +1,4 @@
CREATE OR REPLACE FUNCTION
CREATE FUNCTION
administrators_insert_by_user_uid(
uid text,
authn_user_id bigint
Expand Down
17 changes: 10 additions & 7 deletions sprocs/array_and_number.sql
@@ -1,7 +1,10 @@
--create types
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'array_and_number') THEN
CREATE TYPE array_and_number AS (arr DOUBLE PRECISION[], number INTEGER);
END IF;
END$$;
-- Ideally, we'd use something like `CREATE TYPE IF NOT EXISTS` here, but Postgres doesn't offer that.
-- We used to approximate this by checking `IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = ...)`.
-- However, now that we're creating sprocs and types in a local per-process schema, this would no longer work,
-- as the type would exist in `pg_type` but not necessarily in the schemas on the search path.
-- So, instead, we blindly create the type without checking if it first exists, as it should never exist since we're
-- creating it in a fresh, empty schema. If we needed to check, a good query is:
-- DO $$ BEGIN CREATE TYPE my_type AS (...); EXCEPTION WHEN duplicate_object THEN null; END $$;
-- See https://stackoverflow.com/questions/7624919/check-if-a-user-defined-type-already-exists-in-postgresql/48382296#48382296

CREATE TYPE array_and_number AS (arr DOUBLE PRECISION[], number INTEGER);
5 changes: 2 additions & 3 deletions sprocs/array_avg.sql
@@ -1,4 +1,4 @@
CREATE OR REPLACE FUNCTION
CREATE FUNCTION
array_avg_sfunc (
state array_and_number,
input DOUBLE PRECISION[]
Expand All @@ -16,7 +16,7 @@ BEGIN
END;
$$ LANGUAGE plpgsql IMMUTABLE;

CREATE OR REPLACE FUNCTION
CREATE FUNCTION
array_avg_ffunc (
state array_and_number
) RETURNS DOUBLE PRECISION[] AS $$
Expand All @@ -29,7 +29,6 @@ BEGIN
END;
$$ LANGUAGE plpgsql IMMUTABLE;

DROP AGGREGATE IF EXISTS array_avg (DOUBLE PRECISION[]) CASCADE;
CREATE AGGREGATE array_avg (DOUBLE PRECISION[]) (
SFUNC = array_avg_sfunc,
STYPE = array_and_number,
Expand Down
2 changes: 1 addition & 1 deletion sprocs/array_dot.sql
@@ -1,4 +1,4 @@
CREATE OR REPLACE FUNCTION
CREATE FUNCTION
array_dot(
IN x DOUBLE PRECISION[],
IN y DOUBLE PRECISION[],
Expand Down
3 changes: 1 addition & 2 deletions sprocs/array_histogram.sql
@@ -1,7 +1,7 @@

-- Based on: https://wiki.postgresql.org/wiki/Aggregate_Histogram

CREATE OR REPLACE FUNCTION
CREATE FUNCTION
array_histogram_sfunc (state INTEGER[], val anyelement, thresholds anyarray) RETURNS INTEGER[] AS $$
DECLARE
nbuckets INTEGER;
Expand All @@ -28,7 +28,6 @@ END;
$$ LANGUAGE plpgsql IMMUTABLE;

-- Tell Postgres how to use the new function
DROP AGGREGATE IF EXISTS array_histogram (anyelement, anyarray) CASCADE;
CREATE AGGREGATE array_histogram (anyelement, anyarray) (
SFUNC = array_histogram_sfunc,
STYPE = INTEGER[]
Expand Down
2 changes: 1 addition & 1 deletion sprocs/array_increments_above_max.sql
@@ -1,4 +1,4 @@
CREATE OR REPLACE FUNCTION
CREATE FUNCTION
array_increments_above_max(
IN data double precision[],
OUT increments double precision[]
Expand Down
2 changes: 1 addition & 1 deletion sprocs/array_product.sql
@@ -1,4 +1,4 @@
CREATE OR REPLACE FUNCTION
CREATE FUNCTION
array_product(
IN x DOUBLE PRECISION[],
IN y DOUBLE PRECISION[],
Expand Down
17 changes: 6 additions & 11 deletions sprocs/array_var.sql
@@ -1,10 +1,9 @@
----------------------------------- ONLINE MEAN -----------------------------------
-- https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Online_algorithm

DROP TYPE IF EXISTS mean_and_index CASCADE;
CREATE TYPE mean_and_index AS (mean DOUBLE PRECISION, index DOUBLE PRECISION);

CREATE OR REPLACE FUNCTION online_mean_sfunc (
CREATE FUNCTION online_mean_sfunc (
prev_mean_and_index mean_and_index,
input DOUBLE PRECISION
) RETURNS mean_and_index AS $$
Expand All @@ -27,15 +26,14 @@ BEGIN
END;
$$ LANGUAGE plpgsql IMMUTABLE;

CREATE OR REPLACE FUNCTION online_mean_ffunc(
CREATE FUNCTION online_mean_ffunc(
mean_and_index mean_and_index
) RETURNS DOUBLE PRECISION AS $$
BEGIN
RETURN mean_and_index.mean;
END;
$$ LANGUAGE plpgsql IMMUTABLE;

DROP AGGREGATE IF EXISTS online_mean (DOUBLE PRECISION) CASCADE;
CREATE AGGREGATE online_mean (DOUBLE PRECISION) (
STYPE = mean_and_index,
SFUNC = online_mean_sfunc,
Expand All @@ -45,10 +43,9 @@ CREATE AGGREGATE online_mean (DOUBLE PRECISION) (
-------------------------------- ONLINE VARIANCE --------------------------------
-- https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Online_algorithm

DROP TYPE IF EXISTS mean_and_var_and_index CASCADE;
CREATE TYPE mean_and_var_and_index AS (mean DOUBLE PRECISION, variance DOUBLE PRECISION, index DOUBLE PRECISION);

CREATE OR REPLACE FUNCTION online_var_sfunc (
CREATE FUNCTION online_var_sfunc (
prev_state mean_and_var_and_index,
input DOUBLE PRECISION
) RETURNS mean_and_var_and_index AS $$
Expand All @@ -75,15 +72,14 @@ BEGIN
END;
$$ LANGUAGE plpgsql IMMUTABLE;

CREATE OR REPLACE FUNCTION online_var_ffunc(
CREATE FUNCTION online_var_ffunc(
state mean_and_var_and_index
) RETURNS DOUBLE PRECISION AS $$
BEGIN
RETURN state.variance;
END;
$$ LANGUAGE plpgsql IMMUTABLE;

DROP AGGREGATE IF EXISTS online_var (DOUBLE PRECISION) CASCADE;
CREATE AGGREGATE online_var (DOUBLE PRECISION) (
STYPE = mean_and_var_and_index,
SFUNC = online_var_sfunc,
Expand All @@ -93,7 +89,7 @@ CREATE AGGREGATE online_var (DOUBLE PRECISION) (
------------------------------ ONLINE ARRAY VARIANCE ------------------------------
-- https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Online_algorithm

CREATE OR REPLACE FUNCTION array_var_sfunc (
CREATE FUNCTION array_var_sfunc (
prev_state mean_and_var_and_index[],
input DOUBLE PRECISION[]
) RETURNS mean_and_var_and_index[] AS $$
Expand All @@ -118,7 +114,7 @@ BEGIN
END;
$$ LANGUAGE plpgsql IMMUTABLE;

CREATE OR REPLACE FUNCTION array_var_ffunc(
CREATE FUNCTION array_var_ffunc(
state mean_and_var_and_index[]
) RETURNS DOUBLE PRECISION[] AS $$
DECLARE
Expand All @@ -136,7 +132,6 @@ BEGIN
END;
$$ LANGUAGE plpgsql IMMUTABLE;

DROP AGGREGATE IF EXISTS array_var (DOUBLE PRECISION[]) CASCADE;
CREATE AGGREGATE array_var (DOUBLE PRECISION[]) (
STYPE = mean_and_var_and_index[],
SFUNC = array_var_sfunc,
Expand Down
3 changes: 1 addition & 2 deletions sprocs/assessment_groups_add_member.sql
@@ -1,5 +1,4 @@
DROP FUNCTION IF EXISTS assessment_groups_add_member(bigint,bigint,text,bigint);
CREATE OR REPLACE FUNCTION
CREATE FUNCTION
assessment_groups_add_member(
assessment_id bigint,
arg_group_id bigint,
Expand Down
2 changes: 1 addition & 1 deletion sprocs/assessment_groups_copy.sql
@@ -1,4 +1,4 @@
CREATE OR REPLACE FUNCTION
CREATE FUNCTION
assessment_groups_copy(
assessment_id bigint,
copying_assessment_id bigint,
Expand Down

0 comments on commit d5c37b3

Please sign in to comment.