Skip to content

Commit

Permalink
me: implement multi-schema reset on postgres (#3459)
Browse files Browse the repository at this point in the history
* me: implement multi-schema reset on postgres

Before this commit, the migration engine `reset` command resets the
schema in the search path, on postgresql and cockroachdb.

This is not the expected behaviour when working with multiple schemas
defined in the `schemas` datasource property: users expect all schemas
to be reset, moreover migrate dev will be broken if we do not do that,
because it expects reset to do its job before re-applying migrations.

In this commit, we take into account the namespaces defined in the
Prisma schema the migration engine is initialized with. We reset these
schemas, when defined, *in addition to* the schema in the search path.
The reason we still the search path into account is because that is
where we create and expect the _prisma_migrations table to live. I
expect we may want to revisit that design choice
(issue: prisma/prisma#16565).

closes prisma/prisma#16561

* me: change how multiSchema reset works

In multi-schema mode, instead of wiping out the schema from the search
path in addition to the schemas defined in the datasource, we now only
drop the migrations table in the main schema. This should match migrate
dev's expectations around resets.
  • Loading branch information
tomhoule committed Dec 2, 2022
1 parent a35ef59 commit f8add13
Show file tree
Hide file tree
Showing 15 changed files with 123 additions and 31 deletions.
3 changes: 1 addition & 2 deletions libs/test-cli/src/main.rs
Expand Up @@ -236,8 +236,7 @@ async fn main() -> anyhow::Result<()> {
let schema = read_datamodel_from_file(&cmd.schema_path).context("Error reading the schema from file")?;
let api = migration_core::migration_api(Some(schema), None)?;

// TODO(MultiSchema): Perhaps read namespaces from the schema somehow.
api.reset(None).await?;
api.reset().await?;
}
Command::CreateDatabase(cmd) => {
let schema = read_datamodel_from_file(&cmd.schema_path).context("Error reading the schema from file")?;
Expand Down
Expand Up @@ -28,3 +28,12 @@ impl Namespaces {
}
}
}

impl IntoIterator for Namespaces {
type Item = String;
type IntoIter = std::iter::Chain<std::iter::Once<String>, <Vec<String> as IntoIterator>::IntoIter>;

fn into_iter(self) -> Self::IntoIter {
std::iter::once(self.0).chain(self.1.into_iter())
}
}
Expand Up @@ -241,7 +241,7 @@ pub(crate) trait SqlFlavour:
fn raw_cmd<'a>(&'a mut self, sql: &'a str) -> BoxFuture<'a, ConnectorResult<()>>;

/// Drop the database and recreate it empty.
fn reset(&mut self) -> BoxFuture<'_, ConnectorResult<()>>;
fn reset(&mut self, namespaces: Option<Namespaces>) -> BoxFuture<'_, ConnectorResult<()>>;

/// Optionally scan a migration script that could have been altered by users and emit warnings.
fn scan_migration_script(&self, _script: &str) {}
Expand Down
Expand Up @@ -216,7 +216,7 @@ impl SqlFlavour for MssqlFlavour {
})
}

fn reset(&mut self) -> BoxFuture<'_, ConnectorResult<()>> {
fn reset(&mut self, _namespaces: Option<Namespaces>) -> BoxFuture<'_, ConnectorResult<()>> {
with_connection(&mut self.state, move |params, connection| async move {
let schema_name = params.url.schema();

Expand Down Expand Up @@ -402,7 +402,7 @@ impl SqlFlavour for MssqlFlavour {
shadow_database.set_params(shadow_db_params)?;
shadow_database.ensure_connection_validity().await?;

if shadow_database.reset().await.is_err() {
if shadow_database.reset(None).await.is_err() {
crate::best_effort_reset(&mut shadow_database, namespaces).await?;
}

Expand Down
Expand Up @@ -231,7 +231,7 @@ impl SqlFlavour for MysqlFlavour {
})
}

fn reset(&mut self) -> BoxFuture<'_, ConnectorResult<()>> {
fn reset(&mut self, _namespaces: Option<Namespaces>) -> BoxFuture<'_, ConnectorResult<()>> {
with_connection(&mut self.state, move |params, circumstances, connection| async move {
if circumstances.contains(Circumstances::IsVitess) {
return Err(ConnectorError::from_msg(
Expand Down Expand Up @@ -307,7 +307,7 @@ impl SqlFlavour for MysqlFlavour {
shadow_database.ensure_connection_validity().await?;

tracing::info!("Connecting to user-provided shadow database.");
if shadow_database.reset().await.is_err() {
if shadow_database.reset(None).await.is_err() {
crate::best_effort_reset(&mut shadow_database, namespaces).await?;
}

Expand Down
Expand Up @@ -10,7 +10,7 @@ use migration_connector::{
};
use quaint::connector::PostgresUrl;
use sql_schema_describer::SqlSchema;
use std::{collections::HashMap, future, time};
use std::{borrow::Cow, collections::HashMap, future, time};
use url::Url;
use user_facing_errors::{
common::{DatabaseAccessDenied, DatabaseDoesNotExist},
Expand Down Expand Up @@ -264,14 +264,27 @@ impl SqlFlavour for PostgresFlavour {
})
}

fn reset(&mut self) -> BoxFuture<'_, ConnectorResult<()>> {
fn reset(&mut self, namespaces: Option<Namespaces>) -> BoxFuture<'_, ConnectorResult<()>> {
with_connection(self, move |params, _circumstances, conn| async move {
let schema_name = params.url.schema();
let schemas_to_reset = match namespaces {
Some(ns) => ns.into_iter().map(Cow::Owned).collect(),
None => vec![Cow::Borrowed(params.url.schema())],
};

conn.raw_cmd(&format!("DROP SCHEMA \"{}\" CASCADE", schema_name), &params.url)
.await?;
conn.raw_cmd(&format!("CREATE SCHEMA \"{}\"", schema_name), &params.url)
.await?;
tracing::info!(?schemas_to_reset, "Resetting schema(s)");

for schema_name in schemas_to_reset {
conn.raw_cmd(&format!("DROP SCHEMA \"{}\" CASCADE", schema_name), &params.url)
.await?;
conn.raw_cmd(&format!("CREATE SCHEMA \"{}\"", schema_name), &params.url)
.await?;
}

// Drop the migrations table in the main schema, otherwise migrate dev will not
// perceive that as a reset, since migrations are still marked as applied.
//
// We don't care if this fails.
conn.raw_cmd("DROP TABLE _prisma_migrations", &params.url).await.ok();

Ok(())
})
Expand Down Expand Up @@ -342,7 +355,7 @@ impl SqlFlavour for PostgresFlavour {

tracing::info!("Connecting to user-provided shadow database.");

if shadow_database.reset().await.is_err() {
if shadow_database.reset(namespaces.clone()).await.is_err() {
crate::best_effort_reset(&mut shadow_database, namespaces.clone()).await?;
}

Expand Down
Expand Up @@ -245,7 +245,7 @@ impl SqlFlavour for SqliteFlavour {
ready(with_connection(&mut self.state, |_, conn| conn.raw_cmd(sql)))
}

fn reset(&mut self) -> BoxFuture<'_, ConnectorResult<()>> {
fn reset(&mut self, _namespaces: Option<Namespaces>) -> BoxFuture<'_, ConnectorResult<()>> {
ready(with_connection(&mut self.state, move |params, connection| {
let file_path = &params.file_path;

Expand Down
Expand Up @@ -262,7 +262,7 @@ impl MigrationConnector for SqlMigrationConnector {

fn reset(&mut self, soft: bool, namespaces: Option<Namespaces>) -> BoxFuture<'_, ConnectorResult<()>> {
Box::pin(async move {
if soft || self.flavour.reset().await.is_err() {
if soft || self.flavour.reset(namespaces.clone()).await.is_err() {
best_effort_reset(self.flavour.as_mut(), namespaces).await?;
}

Expand Down
4 changes: 1 addition & 3 deletions migration-engine/core/src/api.rs
@@ -1,7 +1,5 @@
//! The external facing programmatic API to the migration engine.

use migration_connector::Namespaces;

use crate::{commands, json_rpc::types::*, CoreResult};

/// The programmatic, generic, fantastic migration engine API.
Expand Down Expand Up @@ -70,7 +68,7 @@ pub trait GenericApi: Send + Sync + 'static {
) -> CoreResult<MarkMigrationRolledBackOutput>;

/// Reset a database to an empty state (no data, no schema).
async fn reset(&self, namespaces: Option<Namespaces>) -> CoreResult<()>;
async fn reset(&self) -> CoreResult<()>;

/// The command behind `prisma db push`.
async fn schema_push(&self, input: SchemaPushInput) -> CoreResult<SchemaPushOutput>;
Expand Down
6 changes: 3 additions & 3 deletions migration-engine/core/src/rpc.rs
Expand Up @@ -3,9 +3,9 @@ use jsonrpc_core::{types::error::Error as JsonRpcError, IoHandler, Params};
use std::sync::Arc;

/// Initialize a JSON-RPC ready migration engine API.
pub fn rpc_api(datamodel: Option<String>, host: Arc<dyn migration_connector::ConnectorHost>) -> IoHandler {
pub fn rpc_api(prisma_schema: Option<String>, host: Arc<dyn migration_connector::ConnectorHost>) -> IoHandler {
let mut io_handler = IoHandler::default();
let api = Arc::new(crate::state::EngineState::new(datamodel, Some(host)));
let api = Arc::new(crate::state::EngineState::new(prisma_schema, Some(host)));

for cmd in METHOD_NAMES {
let api = api.clone();
Expand Down Expand Up @@ -41,7 +41,7 @@ async fn run_command(
MARK_MIGRATION_APPLIED => render(executor.mark_migration_applied(params.parse()?).await),
MARK_MIGRATION_ROLLED_BACK => render(executor.mark_migration_rolled_back(params.parse()?).await),
// TODO(MultiSchema): we probably need to grab the namespaces from the params
RESET => render(executor.reset(None).await),
RESET => render(executor.reset().await),
SCHEMA_PUSH => render(executor.schema_push(params.parse()?).await),
other => unreachable!("Unknown command {}", other),
}
Expand Down
17 changes: 13 additions & 4 deletions migration-engine/core/src/state.rs
Expand Up @@ -19,7 +19,7 @@ use tracing_futures::Instrument;
/// channels. That ensures that each connector is handling requests one at a time to avoid
/// synchronization issues. You can think of it in terms of the actor model.
pub(crate) struct EngineState {
initial_datamodel: Option<String>,
initial_datamodel: Option<psl::ValidatedSchema>,
host: Arc<dyn ConnectorHost>,
// A map from either:
//
Expand All @@ -46,7 +46,7 @@ type ErasedConnectorRequest = Box<
impl EngineState {
pub(crate) fn new(initial_datamodel: Option<String>, host: Option<Arc<dyn ConnectorHost>>) -> Self {
EngineState {
initial_datamodel,
initial_datamodel: initial_datamodel.map(|s| psl::validate(s.into())),
host: host.unwrap_or_else(|| Arc::new(migration_connector::EmptyHost)),
connectors: Default::default(),
}
Expand Down Expand Up @@ -170,7 +170,7 @@ impl EngineState {
return Err(ConnectorError::from_msg("Missing --datamodel".to_owned()));
};

self.with_connector_for_schema(schema, None, f).await
self.with_connector_for_schema(schema.db.source(), None, f).await
}
}

Expand Down Expand Up @@ -366,9 +366,18 @@ impl GenericApi for EngineState {
.await
}

async fn reset(&self, namespaces: Option<Namespaces>) -> CoreResult<()> {
async fn reset(&self) -> CoreResult<()> {
tracing::debug!("Resetting the database.");

let namespaces: Option<Namespaces> = self
.initial_datamodel
.as_ref()
.and_then(|schema| schema.configuration.datasources.first())
.and_then(|ds| {
let mut names = ds.namespaces.iter().map(|(ns, _)| ns.to_owned()).collect();
Namespaces::from_vec(&mut names)
});

self.with_default_connector(Box::new(move |connector| {
Box::pin(MigrationConnector::reset(connector, false, namespaces).instrument(tracing::info_span!("Reset")))
}))
Expand Down
1 change: 1 addition & 0 deletions migration-engine/migration-engine-tests/src/assertions.rs
Expand Up @@ -71,6 +71,7 @@ impl SchemaAssertion {
}
}

#[track_caller]
fn assert_error(&self, table_name: &str, positive: bool) -> ! {
let method = if positive {
"assert_table"
Expand Down
9 changes: 6 additions & 3 deletions migration-engine/migration-engine-tests/src/test_api.rs
@@ -1,7 +1,10 @@
pub use crate::assertions::{MigrationsAssertions, ResultSetExt, SchemaAssertion};
pub use expect_test::expect;
pub use migration_core::json_rpc::types::{
DbExecuteDatasourceType, DbExecuteParams, DiffParams, DiffResult, SchemaContainer, UrlContainer,
pub use migration_core::{
json_rpc::types::{
DbExecuteDatasourceType, DbExecuteParams, DiffParams, DiffResult, SchemaContainer, UrlContainer,
},
migration_connector::Namespaces,
};
pub use test_macros::test_connector;
pub use test_setup::{runtime::run_with_thread_local_runtime as tok, BitFlags, Capabilities, Tags};
Expand All @@ -10,7 +13,7 @@ use crate::{commands::*, multi_engine_test_api::TestApi as RootTestApi};
use migration_core::{
commands::diff,
migration_connector::{
BoxFuture, ConnectorHost, ConnectorResult, DiffTarget, MigrationConnector, MigrationPersistence, Namespaces,
BoxFuture, ConnectorHost, ConnectorResult, DiffTarget, MigrationConnector, MigrationPersistence,
},
};
use psl::parser_database::SourceFile;
Expand Down
Expand Up @@ -93,3 +93,63 @@ fn reset_then_diagnostics_with_migrations_directory_works(api: TestApi) {
.assert_has_table("Cat")
.assert_has_table("_prisma_migrations");
}

#[test_connector(
tags(Postgres),
exclude(CockroachDb),
namespaces("felines", "rodents"),
preview_features("multiSchema")
)]
fn multi_schema_reset(mut api: TestApi) {
let prisma_schema = format! {
r#"
{}
generator js {{
provider = "prisma-client-js"
previewFeatures = ["multiSchema"]
}}
model Manul {{
id Int @id
@@schema("felines")
}}
model Capybara {{
id Int @id
@@schema("rodents")
}}
"#, api.datasource_block_with(&[("schemas", r#"["felines", "rodents"]"#)])
};
api.schema_push(&prisma_schema)
.send()
.assert_green()
.assert_has_executed_steps();

let migrations_dir = api.create_migrations_directory();
api.apply_migrations(&migrations_dir).send_sync();
api.raw_cmd("CREATE TABLE randomTable (id INTEGER PRIMARY KEY);");

let all_namespaces = Namespaces::from_vec(&mut vec!["felines".into(), "rodents".into(), api.schema_name()]);
let namespaces_in_psl = Namespaces::from_vec(&mut vec!["felines".into(), "rodents".into()]);

api.assert_schema_with_namespaces(all_namespaces.clone())
.assert_has_table("randomtable")
.assert_has_table("_prisma_migrations")
.assert_has_table("Manul")
.assert_has_table("Capybara");

api.reset().send_sync(namespaces_in_psl);

api.assert_schema_with_namespaces(all_namespaces)
.assert_has_table("randomtable") // we do not want to wipe the schema from search_path
.assert_has_no_table("_prisma_migrations")
.assert_has_no_table("Manul")
.assert_has_no_table("Capybara");

// Check that we can migrate from there.
api.schema_push(&prisma_schema)
.send()
.assert_green()
.assert_has_executed_steps();
}
2 changes: 1 addition & 1 deletion migration-engine/qe-setup/src/mssql.rs
Expand Up @@ -22,7 +22,7 @@ pub(crate) async fn mssql_setup(url: String, prisma_schema: &str, db_schemas: &[
conn.raw_cmd(&format!("USE [{db_name}];")).await.unwrap();
} else {
let api = migration_core::migration_api(Some(prisma_schema.to_owned()), None)?;
api.reset(None).await.ok();
api.reset().await.ok();
// Without these, our poor connection gets deadlocks if other schemas
// are modified while we introspect.
let allow_snapshot_isolation = format!(
Expand Down

0 comments on commit f8add13

Please sign in to comment.