From f8add13e95bdaba353898131a2675da34f03f3a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=20Houl=C3=A9?= <13155277+tomhoule@users.noreply.github.com> Date: Fri, 2 Dec 2022 14:49:47 +0100 Subject: [PATCH] me: implement multi-schema reset on postgres (#3459) * me: implement multi-schema reset on postgres Before this commit, the migration engine `reset` command resets the schema in the search path, on postgresql and cockroachdb. This is not the expected behaviour when working with multiple schemas defined in the `schemas` datasource property: users expect all schemas to be reset, moreover migrate dev will be broken if we do not do that, because it expects reset to do its job before re-applying migrations. In this commit, we take into account the namespaces defined in the Prisma schema the migration engine is initialized with. We reset these schemas, when defined, *in addition to* the schema in the search path. The reason we still the search path into account is because that is where we create and expect the _prisma_migrations table to live. I expect we may want to revisit that design choice (issue: https://github.com/prisma/prisma/issues/16565). closes https://github.com/prisma/prisma/issues/16561 * me: change how multiSchema reset works In multi-schema mode, instead of wiping out the schema from the search path in addition to the schemas defined in the datasource, we now only drop the migrations table in the main schema. This should match migrate dev's expectations around resets. --- libs/test-cli/src/main.rs | 3 +- .../migration-connector/src/namespaces.rs | 9 +++ .../sql-migration-connector/src/flavour.rs | 2 +- .../src/flavour/mssql.rs | 4 +- .../src/flavour/mysql.rs | 4 +- .../src/flavour/postgres.rs | 29 ++++++--- .../src/flavour/sqlite.rs | 2 +- .../sql-migration-connector/src/lib.rs | 2 +- migration-engine/core/src/api.rs | 4 +- migration-engine/core/src/rpc.rs | 6 +- migration-engine/core/src/state.rs | 17 ++++-- .../migration-engine-tests/src/assertions.rs | 1 + .../migration-engine-tests/src/test_api.rs | 9 ++- .../tests/migrations/reset_tests.rs | 60 +++++++++++++++++++ migration-engine/qe-setup/src/mssql.rs | 2 +- 15 files changed, 123 insertions(+), 31 deletions(-) diff --git a/libs/test-cli/src/main.rs b/libs/test-cli/src/main.rs index 87320ad22989..40afe1c05cd3 100644 --- a/libs/test-cli/src/main.rs +++ b/libs/test-cli/src/main.rs @@ -236,8 +236,7 @@ async fn main() -> anyhow::Result<()> { let schema = read_datamodel_from_file(&cmd.schema_path).context("Error reading the schema from file")?; let api = migration_core::migration_api(Some(schema), None)?; - // TODO(MultiSchema): Perhaps read namespaces from the schema somehow. - api.reset(None).await?; + api.reset().await?; } Command::CreateDatabase(cmd) => { let schema = read_datamodel_from_file(&cmd.schema_path).context("Error reading the schema from file")?; diff --git a/migration-engine/connectors/migration-connector/src/namespaces.rs b/migration-engine/connectors/migration-connector/src/namespaces.rs index 9ba48d96c057..0f18c2ecdb2d 100644 --- a/migration-engine/connectors/migration-connector/src/namespaces.rs +++ b/migration-engine/connectors/migration-connector/src/namespaces.rs @@ -28,3 +28,12 @@ impl Namespaces { } } } + +impl IntoIterator for Namespaces { + type Item = String; + type IntoIter = std::iter::Chain, as IntoIterator>::IntoIter>; + + fn into_iter(self) -> Self::IntoIter { + std::iter::once(self.0).chain(self.1.into_iter()) + } +} diff --git a/migration-engine/connectors/sql-migration-connector/src/flavour.rs b/migration-engine/connectors/sql-migration-connector/src/flavour.rs index 1b100134f7a9..7eaf0f23b794 100644 --- a/migration-engine/connectors/sql-migration-connector/src/flavour.rs +++ b/migration-engine/connectors/sql-migration-connector/src/flavour.rs @@ -241,7 +241,7 @@ pub(crate) trait SqlFlavour: fn raw_cmd<'a>(&'a mut self, sql: &'a str) -> BoxFuture<'a, ConnectorResult<()>>; /// Drop the database and recreate it empty. - fn reset(&mut self) -> BoxFuture<'_, ConnectorResult<()>>; + fn reset(&mut self, namespaces: Option) -> BoxFuture<'_, ConnectorResult<()>>; /// Optionally scan a migration script that could have been altered by users and emit warnings. fn scan_migration_script(&self, _script: &str) {} diff --git a/migration-engine/connectors/sql-migration-connector/src/flavour/mssql.rs b/migration-engine/connectors/sql-migration-connector/src/flavour/mssql.rs index 24ee557668ac..1d1ee32e084c 100644 --- a/migration-engine/connectors/sql-migration-connector/src/flavour/mssql.rs +++ b/migration-engine/connectors/sql-migration-connector/src/flavour/mssql.rs @@ -216,7 +216,7 @@ impl SqlFlavour for MssqlFlavour { }) } - fn reset(&mut self) -> BoxFuture<'_, ConnectorResult<()>> { + fn reset(&mut self, _namespaces: Option) -> BoxFuture<'_, ConnectorResult<()>> { with_connection(&mut self.state, move |params, connection| async move { let schema_name = params.url.schema(); @@ -402,7 +402,7 @@ impl SqlFlavour for MssqlFlavour { shadow_database.set_params(shadow_db_params)?; shadow_database.ensure_connection_validity().await?; - if shadow_database.reset().await.is_err() { + if shadow_database.reset(None).await.is_err() { crate::best_effort_reset(&mut shadow_database, namespaces).await?; } diff --git a/migration-engine/connectors/sql-migration-connector/src/flavour/mysql.rs b/migration-engine/connectors/sql-migration-connector/src/flavour/mysql.rs index 152836e449b8..3ae6d310ad5f 100644 --- a/migration-engine/connectors/sql-migration-connector/src/flavour/mysql.rs +++ b/migration-engine/connectors/sql-migration-connector/src/flavour/mysql.rs @@ -231,7 +231,7 @@ impl SqlFlavour for MysqlFlavour { }) } - fn reset(&mut self) -> BoxFuture<'_, ConnectorResult<()>> { + fn reset(&mut self, _namespaces: Option) -> BoxFuture<'_, ConnectorResult<()>> { with_connection(&mut self.state, move |params, circumstances, connection| async move { if circumstances.contains(Circumstances::IsVitess) { return Err(ConnectorError::from_msg( @@ -307,7 +307,7 @@ impl SqlFlavour for MysqlFlavour { shadow_database.ensure_connection_validity().await?; tracing::info!("Connecting to user-provided shadow database."); - if shadow_database.reset().await.is_err() { + if shadow_database.reset(None).await.is_err() { crate::best_effort_reset(&mut shadow_database, namespaces).await?; } diff --git a/migration-engine/connectors/sql-migration-connector/src/flavour/postgres.rs b/migration-engine/connectors/sql-migration-connector/src/flavour/postgres.rs index 3186a414967b..128ecd536649 100644 --- a/migration-engine/connectors/sql-migration-connector/src/flavour/postgres.rs +++ b/migration-engine/connectors/sql-migration-connector/src/flavour/postgres.rs @@ -10,7 +10,7 @@ use migration_connector::{ }; use quaint::connector::PostgresUrl; use sql_schema_describer::SqlSchema; -use std::{collections::HashMap, future, time}; +use std::{borrow::Cow, collections::HashMap, future, time}; use url::Url; use user_facing_errors::{ common::{DatabaseAccessDenied, DatabaseDoesNotExist}, @@ -264,14 +264,27 @@ impl SqlFlavour for PostgresFlavour { }) } - fn reset(&mut self) -> BoxFuture<'_, ConnectorResult<()>> { + fn reset(&mut self, namespaces: Option) -> BoxFuture<'_, ConnectorResult<()>> { with_connection(self, move |params, _circumstances, conn| async move { - let schema_name = params.url.schema(); + let schemas_to_reset = match namespaces { + Some(ns) => ns.into_iter().map(Cow::Owned).collect(), + None => vec![Cow::Borrowed(params.url.schema())], + }; - conn.raw_cmd(&format!("DROP SCHEMA \"{}\" CASCADE", schema_name), ¶ms.url) - .await?; - conn.raw_cmd(&format!("CREATE SCHEMA \"{}\"", schema_name), ¶ms.url) - .await?; + tracing::info!(?schemas_to_reset, "Resetting schema(s)"); + + for schema_name in schemas_to_reset { + conn.raw_cmd(&format!("DROP SCHEMA \"{}\" CASCADE", schema_name), ¶ms.url) + .await?; + conn.raw_cmd(&format!("CREATE SCHEMA \"{}\"", schema_name), ¶ms.url) + .await?; + } + + // Drop the migrations table in the main schema, otherwise migrate dev will not + // perceive that as a reset, since migrations are still marked as applied. + // + // We don't care if this fails. + conn.raw_cmd("DROP TABLE _prisma_migrations", ¶ms.url).await.ok(); Ok(()) }) @@ -342,7 +355,7 @@ impl SqlFlavour for PostgresFlavour { tracing::info!("Connecting to user-provided shadow database."); - if shadow_database.reset().await.is_err() { + if shadow_database.reset(namespaces.clone()).await.is_err() { crate::best_effort_reset(&mut shadow_database, namespaces.clone()).await?; } diff --git a/migration-engine/connectors/sql-migration-connector/src/flavour/sqlite.rs b/migration-engine/connectors/sql-migration-connector/src/flavour/sqlite.rs index c775a6d7d52f..a73a299300be 100644 --- a/migration-engine/connectors/sql-migration-connector/src/flavour/sqlite.rs +++ b/migration-engine/connectors/sql-migration-connector/src/flavour/sqlite.rs @@ -245,7 +245,7 @@ impl SqlFlavour for SqliteFlavour { ready(with_connection(&mut self.state, |_, conn| conn.raw_cmd(sql))) } - fn reset(&mut self) -> BoxFuture<'_, ConnectorResult<()>> { + fn reset(&mut self, _namespaces: Option) -> BoxFuture<'_, ConnectorResult<()>> { ready(with_connection(&mut self.state, move |params, connection| { let file_path = ¶ms.file_path; diff --git a/migration-engine/connectors/sql-migration-connector/src/lib.rs b/migration-engine/connectors/sql-migration-connector/src/lib.rs index 925b5e377ecd..4f02986015e0 100644 --- a/migration-engine/connectors/sql-migration-connector/src/lib.rs +++ b/migration-engine/connectors/sql-migration-connector/src/lib.rs @@ -262,7 +262,7 @@ impl MigrationConnector for SqlMigrationConnector { fn reset(&mut self, soft: bool, namespaces: Option) -> BoxFuture<'_, ConnectorResult<()>> { Box::pin(async move { - if soft || self.flavour.reset().await.is_err() { + if soft || self.flavour.reset(namespaces.clone()).await.is_err() { best_effort_reset(self.flavour.as_mut(), namespaces).await?; } diff --git a/migration-engine/core/src/api.rs b/migration-engine/core/src/api.rs index 81050d947fc0..89fdaf3e80a6 100644 --- a/migration-engine/core/src/api.rs +++ b/migration-engine/core/src/api.rs @@ -1,7 +1,5 @@ //! The external facing programmatic API to the migration engine. -use migration_connector::Namespaces; - use crate::{commands, json_rpc::types::*, CoreResult}; /// The programmatic, generic, fantastic migration engine API. @@ -70,7 +68,7 @@ pub trait GenericApi: Send + Sync + 'static { ) -> CoreResult; /// Reset a database to an empty state (no data, no schema). - async fn reset(&self, namespaces: Option) -> CoreResult<()>; + async fn reset(&self) -> CoreResult<()>; /// The command behind `prisma db push`. async fn schema_push(&self, input: SchemaPushInput) -> CoreResult; diff --git a/migration-engine/core/src/rpc.rs b/migration-engine/core/src/rpc.rs index 05b325aec7a9..486f2583a5a7 100644 --- a/migration-engine/core/src/rpc.rs +++ b/migration-engine/core/src/rpc.rs @@ -3,9 +3,9 @@ use jsonrpc_core::{types::error::Error as JsonRpcError, IoHandler, Params}; use std::sync::Arc; /// Initialize a JSON-RPC ready migration engine API. -pub fn rpc_api(datamodel: Option, host: Arc) -> IoHandler { +pub fn rpc_api(prisma_schema: Option, host: Arc) -> IoHandler { let mut io_handler = IoHandler::default(); - let api = Arc::new(crate::state::EngineState::new(datamodel, Some(host))); + let api = Arc::new(crate::state::EngineState::new(prisma_schema, Some(host))); for cmd in METHOD_NAMES { let api = api.clone(); @@ -41,7 +41,7 @@ async fn run_command( MARK_MIGRATION_APPLIED => render(executor.mark_migration_applied(params.parse()?).await), MARK_MIGRATION_ROLLED_BACK => render(executor.mark_migration_rolled_back(params.parse()?).await), // TODO(MultiSchema): we probably need to grab the namespaces from the params - RESET => render(executor.reset(None).await), + RESET => render(executor.reset().await), SCHEMA_PUSH => render(executor.schema_push(params.parse()?).await), other => unreachable!("Unknown command {}", other), } diff --git a/migration-engine/core/src/state.rs b/migration-engine/core/src/state.rs index e7a34e4ca237..df59a300b3a4 100644 --- a/migration-engine/core/src/state.rs +++ b/migration-engine/core/src/state.rs @@ -19,7 +19,7 @@ use tracing_futures::Instrument; /// channels. That ensures that each connector is handling requests one at a time to avoid /// synchronization issues. You can think of it in terms of the actor model. pub(crate) struct EngineState { - initial_datamodel: Option, + initial_datamodel: Option, host: Arc, // A map from either: // @@ -46,7 +46,7 @@ type ErasedConnectorRequest = Box< impl EngineState { pub(crate) fn new(initial_datamodel: Option, host: Option>) -> Self { EngineState { - initial_datamodel, + initial_datamodel: initial_datamodel.map(|s| psl::validate(s.into())), host: host.unwrap_or_else(|| Arc::new(migration_connector::EmptyHost)), connectors: Default::default(), } @@ -170,7 +170,7 @@ impl EngineState { return Err(ConnectorError::from_msg("Missing --datamodel".to_owned())); }; - self.with_connector_for_schema(schema, None, f).await + self.with_connector_for_schema(schema.db.source(), None, f).await } } @@ -366,9 +366,18 @@ impl GenericApi for EngineState { .await } - async fn reset(&self, namespaces: Option) -> CoreResult<()> { + async fn reset(&self) -> CoreResult<()> { tracing::debug!("Resetting the database."); + let namespaces: Option = self + .initial_datamodel + .as_ref() + .and_then(|schema| schema.configuration.datasources.first()) + .and_then(|ds| { + let mut names = ds.namespaces.iter().map(|(ns, _)| ns.to_owned()).collect(); + Namespaces::from_vec(&mut names) + }); + self.with_default_connector(Box::new(move |connector| { Box::pin(MigrationConnector::reset(connector, false, namespaces).instrument(tracing::info_span!("Reset"))) })) diff --git a/migration-engine/migration-engine-tests/src/assertions.rs b/migration-engine/migration-engine-tests/src/assertions.rs index 18a80afa30d6..f68881270a8c 100644 --- a/migration-engine/migration-engine-tests/src/assertions.rs +++ b/migration-engine/migration-engine-tests/src/assertions.rs @@ -71,6 +71,7 @@ impl SchemaAssertion { } } + #[track_caller] fn assert_error(&self, table_name: &str, positive: bool) -> ! { let method = if positive { "assert_table" diff --git a/migration-engine/migration-engine-tests/src/test_api.rs b/migration-engine/migration-engine-tests/src/test_api.rs index e172e321803f..760bd4f638be 100644 --- a/migration-engine/migration-engine-tests/src/test_api.rs +++ b/migration-engine/migration-engine-tests/src/test_api.rs @@ -1,7 +1,10 @@ pub use crate::assertions::{MigrationsAssertions, ResultSetExt, SchemaAssertion}; pub use expect_test::expect; -pub use migration_core::json_rpc::types::{ - DbExecuteDatasourceType, DbExecuteParams, DiffParams, DiffResult, SchemaContainer, UrlContainer, +pub use migration_core::{ + json_rpc::types::{ + DbExecuteDatasourceType, DbExecuteParams, DiffParams, DiffResult, SchemaContainer, UrlContainer, + }, + migration_connector::Namespaces, }; pub use test_macros::test_connector; pub use test_setup::{runtime::run_with_thread_local_runtime as tok, BitFlags, Capabilities, Tags}; @@ -10,7 +13,7 @@ use crate::{commands::*, multi_engine_test_api::TestApi as RootTestApi}; use migration_core::{ commands::diff, migration_connector::{ - BoxFuture, ConnectorHost, ConnectorResult, DiffTarget, MigrationConnector, MigrationPersistence, Namespaces, + BoxFuture, ConnectorHost, ConnectorResult, DiffTarget, MigrationConnector, MigrationPersistence, }, }; use psl::parser_database::SourceFile; diff --git a/migration-engine/migration-engine-tests/tests/migrations/reset_tests.rs b/migration-engine/migration-engine-tests/tests/migrations/reset_tests.rs index 8eebb8e47f06..9626a2655204 100644 --- a/migration-engine/migration-engine-tests/tests/migrations/reset_tests.rs +++ b/migration-engine/migration-engine-tests/tests/migrations/reset_tests.rs @@ -93,3 +93,63 @@ fn reset_then_diagnostics_with_migrations_directory_works(api: TestApi) { .assert_has_table("Cat") .assert_has_table("_prisma_migrations"); } + +#[test_connector( + tags(Postgres), + exclude(CockroachDb), + namespaces("felines", "rodents"), + preview_features("multiSchema") +)] +fn multi_schema_reset(mut api: TestApi) { + let prisma_schema = format! { + r#" + {} + + generator js {{ + provider = "prisma-client-js" + previewFeatures = ["multiSchema"] + }} + + model Manul {{ + id Int @id + @@schema("felines") + }} + + model Capybara {{ + id Int @id + @@schema("rodents") + }} + "#, api.datasource_block_with(&[("schemas", r#"["felines", "rodents"]"#)]) + }; + api.schema_push(&prisma_schema) + .send() + .assert_green() + .assert_has_executed_steps(); + + let migrations_dir = api.create_migrations_directory(); + api.apply_migrations(&migrations_dir).send_sync(); + api.raw_cmd("CREATE TABLE randomTable (id INTEGER PRIMARY KEY);"); + + let all_namespaces = Namespaces::from_vec(&mut vec!["felines".into(), "rodents".into(), api.schema_name()]); + let namespaces_in_psl = Namespaces::from_vec(&mut vec!["felines".into(), "rodents".into()]); + + api.assert_schema_with_namespaces(all_namespaces.clone()) + .assert_has_table("randomtable") + .assert_has_table("_prisma_migrations") + .assert_has_table("Manul") + .assert_has_table("Capybara"); + + api.reset().send_sync(namespaces_in_psl); + + api.assert_schema_with_namespaces(all_namespaces) + .assert_has_table("randomtable") // we do not want to wipe the schema from search_path + .assert_has_no_table("_prisma_migrations") + .assert_has_no_table("Manul") + .assert_has_no_table("Capybara"); + + // Check that we can migrate from there. + api.schema_push(&prisma_schema) + .send() + .assert_green() + .assert_has_executed_steps(); +} diff --git a/migration-engine/qe-setup/src/mssql.rs b/migration-engine/qe-setup/src/mssql.rs index 0bd3f5fab6cc..f56e4c229495 100644 --- a/migration-engine/qe-setup/src/mssql.rs +++ b/migration-engine/qe-setup/src/mssql.rs @@ -22,7 +22,7 @@ pub(crate) async fn mssql_setup(url: String, prisma_schema: &str, db_schemas: &[ conn.raw_cmd(&format!("USE [{db_name}];")).await.unwrap(); } else { let api = migration_core::migration_api(Some(prisma_schema.to_owned()), None)?; - api.reset(None).await.ok(); + api.reset().await.ok(); // Without these, our poor connection gets deadlocks if other schemas // are modified while we introspect. let allow_snapshot_isolation = format!(