From a1e9ef67b90334a56b9a17203ea8611ce27108b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=20Houl=C3=A9?= Date: Thu, 1 Dec 2022 08:07:34 +0100 Subject: [PATCH] me: implement multi-schema reset on postgres Before this commit, the migration engine `reset` command resets the schema in the search path, on postgresql and cockroachdb. This is not the expected behaviour when working with multiple schemas defined in the `schemas` datasource property: users expect all schemas to be reset, moreover migrate dev will be broken if we do not do that, because it expects reset to do its job before re-applying migrations. In this commit, we take into account the namespaces defined in the Prisma schema the migration engine is initialized with. We reset these schemas, when defined, *in addition to* the schema in the search path. The reason we still the search path into account is because that is where we create and expect the _prisma_migrations table to live. I expect we may want to revisit that design choice (issue: https://github.com/prisma/prisma/issues/16565). closes https://github.com/prisma/prisma/issues/16561 --- libs/test-cli/src/main.rs | 3 +- .../migration-connector/src/namespaces.rs | 9 ++++ .../sql-migration-connector/src/flavour.rs | 2 +- .../src/flavour/mssql.rs | 4 +- .../src/flavour/mysql.rs | 4 +- .../src/flavour/postgres.rs | 27 +++++++--- .../src/flavour/sqlite.rs | 2 +- .../sql-migration-connector/src/lib.rs | 2 +- migration-engine/core/src/api.rs | 4 +- migration-engine/core/src/rpc.rs | 6 +-- migration-engine/core/src/state.rs | 17 +++++-- .../migration-engine-tests/src/assertions.rs | 1 + .../migration-engine-tests/src/test_api.rs | 9 ++-- .../tests/migrations/reset_tests.rs | 50 +++++++++++++++++++ migration-engine/qe-setup/src/mssql.rs | 2 +- 15 files changed, 111 insertions(+), 31 deletions(-) diff --git a/libs/test-cli/src/main.rs b/libs/test-cli/src/main.rs index 87320ad22989..40afe1c05cd3 100644 --- a/libs/test-cli/src/main.rs +++ b/libs/test-cli/src/main.rs @@ -236,8 +236,7 @@ async fn main() -> anyhow::Result<()> { let schema = read_datamodel_from_file(&cmd.schema_path).context("Error reading the schema from file")?; let api = migration_core::migration_api(Some(schema), None)?; - // TODO(MultiSchema): Perhaps read namespaces from the schema somehow. - api.reset(None).await?; + api.reset().await?; } Command::CreateDatabase(cmd) => { let schema = read_datamodel_from_file(&cmd.schema_path).context("Error reading the schema from file")?; diff --git a/migration-engine/connectors/migration-connector/src/namespaces.rs b/migration-engine/connectors/migration-connector/src/namespaces.rs index 9ba48d96c057..0f18c2ecdb2d 100644 --- a/migration-engine/connectors/migration-connector/src/namespaces.rs +++ b/migration-engine/connectors/migration-connector/src/namespaces.rs @@ -28,3 +28,12 @@ impl Namespaces { } } } + +impl IntoIterator for Namespaces { + type Item = String; + type IntoIter = std::iter::Chain, as IntoIterator>::IntoIter>; + + fn into_iter(self) -> Self::IntoIter { + std::iter::once(self.0).chain(self.1.into_iter()) + } +} diff --git a/migration-engine/connectors/sql-migration-connector/src/flavour.rs b/migration-engine/connectors/sql-migration-connector/src/flavour.rs index 1b100134f7a9..7eaf0f23b794 100644 --- a/migration-engine/connectors/sql-migration-connector/src/flavour.rs +++ b/migration-engine/connectors/sql-migration-connector/src/flavour.rs @@ -241,7 +241,7 @@ pub(crate) trait SqlFlavour: fn raw_cmd<'a>(&'a mut self, sql: &'a str) -> BoxFuture<'a, ConnectorResult<()>>; /// Drop the database and recreate it empty. - fn reset(&mut self) -> BoxFuture<'_, ConnectorResult<()>>; + fn reset(&mut self, namespaces: Option) -> BoxFuture<'_, ConnectorResult<()>>; /// Optionally scan a migration script that could have been altered by users and emit warnings. fn scan_migration_script(&self, _script: &str) {} diff --git a/migration-engine/connectors/sql-migration-connector/src/flavour/mssql.rs b/migration-engine/connectors/sql-migration-connector/src/flavour/mssql.rs index 24ee557668ac..1d1ee32e084c 100644 --- a/migration-engine/connectors/sql-migration-connector/src/flavour/mssql.rs +++ b/migration-engine/connectors/sql-migration-connector/src/flavour/mssql.rs @@ -216,7 +216,7 @@ impl SqlFlavour for MssqlFlavour { }) } - fn reset(&mut self) -> BoxFuture<'_, ConnectorResult<()>> { + fn reset(&mut self, _namespaces: Option) -> BoxFuture<'_, ConnectorResult<()>> { with_connection(&mut self.state, move |params, connection| async move { let schema_name = params.url.schema(); @@ -402,7 +402,7 @@ impl SqlFlavour for MssqlFlavour { shadow_database.set_params(shadow_db_params)?; shadow_database.ensure_connection_validity().await?; - if shadow_database.reset().await.is_err() { + if shadow_database.reset(None).await.is_err() { crate::best_effort_reset(&mut shadow_database, namespaces).await?; } diff --git a/migration-engine/connectors/sql-migration-connector/src/flavour/mysql.rs b/migration-engine/connectors/sql-migration-connector/src/flavour/mysql.rs index 152836e449b8..3ae6d310ad5f 100644 --- a/migration-engine/connectors/sql-migration-connector/src/flavour/mysql.rs +++ b/migration-engine/connectors/sql-migration-connector/src/flavour/mysql.rs @@ -231,7 +231,7 @@ impl SqlFlavour for MysqlFlavour { }) } - fn reset(&mut self) -> BoxFuture<'_, ConnectorResult<()>> { + fn reset(&mut self, _namespaces: Option) -> BoxFuture<'_, ConnectorResult<()>> { with_connection(&mut self.state, move |params, circumstances, connection| async move { if circumstances.contains(Circumstances::IsVitess) { return Err(ConnectorError::from_msg( @@ -307,7 +307,7 @@ impl SqlFlavour for MysqlFlavour { shadow_database.ensure_connection_validity().await?; tracing::info!("Connecting to user-provided shadow database."); - if shadow_database.reset().await.is_err() { + if shadow_database.reset(None).await.is_err() { crate::best_effort_reset(&mut shadow_database, namespaces).await?; } diff --git a/migration-engine/connectors/sql-migration-connector/src/flavour/postgres.rs b/migration-engine/connectors/sql-migration-connector/src/flavour/postgres.rs index 3186a414967b..7c0fa45c2624 100644 --- a/migration-engine/connectors/sql-migration-connector/src/flavour/postgres.rs +++ b/migration-engine/connectors/sql-migration-connector/src/flavour/postgres.rs @@ -10,7 +10,7 @@ use migration_connector::{ }; use quaint::connector::PostgresUrl; use sql_schema_describer::SqlSchema; -use std::{collections::HashMap, future, time}; +use std::{borrow::Cow, collections::HashMap, future, time}; use url::Url; use user_facing_errors::{ common::{DatabaseAccessDenied, DatabaseDoesNotExist}, @@ -264,14 +264,25 @@ impl SqlFlavour for PostgresFlavour { }) } - fn reset(&mut self) -> BoxFuture<'_, ConnectorResult<()>> { + fn reset(&mut self, namespaces: Option) -> BoxFuture<'_, ConnectorResult<()>> { with_connection(self, move |params, _circumstances, conn| async move { - let schema_name = params.url.schema(); + let mut schemas = vec![Cow::Borrowed(params.url.schema())]; - conn.raw_cmd(&format!("DROP SCHEMA \"{}\" CASCADE", schema_name), ¶ms.url) - .await?; - conn.raw_cmd(&format!("CREATE SCHEMA \"{}\"", schema_name), ¶ms.url) - .await?; + // We reset the namespaces defined in the `schemas` datasource property _in addition to + // the schema in the search path_, because the search path is where we create the + // migrations table. + for namespace in namespaces.into_iter().flatten() { + schemas.push(Cow::Owned(namespace)) + } + + tracing::info!(?schemas, "Resetting schema(s)"); + + for schema_name in schemas { + conn.raw_cmd(&format!("DROP SCHEMA \"{}\" CASCADE", schema_name), ¶ms.url) + .await?; + conn.raw_cmd(&format!("CREATE SCHEMA \"{}\"", schema_name), ¶ms.url) + .await?; + } Ok(()) }) @@ -342,7 +353,7 @@ impl SqlFlavour for PostgresFlavour { tracing::info!("Connecting to user-provided shadow database."); - if shadow_database.reset().await.is_err() { + if shadow_database.reset(namespaces.clone()).await.is_err() { crate::best_effort_reset(&mut shadow_database, namespaces.clone()).await?; } diff --git a/migration-engine/connectors/sql-migration-connector/src/flavour/sqlite.rs b/migration-engine/connectors/sql-migration-connector/src/flavour/sqlite.rs index c775a6d7d52f..a73a299300be 100644 --- a/migration-engine/connectors/sql-migration-connector/src/flavour/sqlite.rs +++ b/migration-engine/connectors/sql-migration-connector/src/flavour/sqlite.rs @@ -245,7 +245,7 @@ impl SqlFlavour for SqliteFlavour { ready(with_connection(&mut self.state, |_, conn| conn.raw_cmd(sql))) } - fn reset(&mut self) -> BoxFuture<'_, ConnectorResult<()>> { + fn reset(&mut self, _namespaces: Option) -> BoxFuture<'_, ConnectorResult<()>> { ready(with_connection(&mut self.state, move |params, connection| { let file_path = ¶ms.file_path; diff --git a/migration-engine/connectors/sql-migration-connector/src/lib.rs b/migration-engine/connectors/sql-migration-connector/src/lib.rs index 925b5e377ecd..4f02986015e0 100644 --- a/migration-engine/connectors/sql-migration-connector/src/lib.rs +++ b/migration-engine/connectors/sql-migration-connector/src/lib.rs @@ -262,7 +262,7 @@ impl MigrationConnector for SqlMigrationConnector { fn reset(&mut self, soft: bool, namespaces: Option) -> BoxFuture<'_, ConnectorResult<()>> { Box::pin(async move { - if soft || self.flavour.reset().await.is_err() { + if soft || self.flavour.reset(namespaces.clone()).await.is_err() { best_effort_reset(self.flavour.as_mut(), namespaces).await?; } diff --git a/migration-engine/core/src/api.rs b/migration-engine/core/src/api.rs index 81050d947fc0..89fdaf3e80a6 100644 --- a/migration-engine/core/src/api.rs +++ b/migration-engine/core/src/api.rs @@ -1,7 +1,5 @@ //! The external facing programmatic API to the migration engine. -use migration_connector::Namespaces; - use crate::{commands, json_rpc::types::*, CoreResult}; /// The programmatic, generic, fantastic migration engine API. @@ -70,7 +68,7 @@ pub trait GenericApi: Send + Sync + 'static { ) -> CoreResult; /// Reset a database to an empty state (no data, no schema). - async fn reset(&self, namespaces: Option) -> CoreResult<()>; + async fn reset(&self) -> CoreResult<()>; /// The command behind `prisma db push`. async fn schema_push(&self, input: SchemaPushInput) -> CoreResult; diff --git a/migration-engine/core/src/rpc.rs b/migration-engine/core/src/rpc.rs index 05b325aec7a9..486f2583a5a7 100644 --- a/migration-engine/core/src/rpc.rs +++ b/migration-engine/core/src/rpc.rs @@ -3,9 +3,9 @@ use jsonrpc_core::{types::error::Error as JsonRpcError, IoHandler, Params}; use std::sync::Arc; /// Initialize a JSON-RPC ready migration engine API. -pub fn rpc_api(datamodel: Option, host: Arc) -> IoHandler { +pub fn rpc_api(prisma_schema: Option, host: Arc) -> IoHandler { let mut io_handler = IoHandler::default(); - let api = Arc::new(crate::state::EngineState::new(datamodel, Some(host))); + let api = Arc::new(crate::state::EngineState::new(prisma_schema, Some(host))); for cmd in METHOD_NAMES { let api = api.clone(); @@ -41,7 +41,7 @@ async fn run_command( MARK_MIGRATION_APPLIED => render(executor.mark_migration_applied(params.parse()?).await), MARK_MIGRATION_ROLLED_BACK => render(executor.mark_migration_rolled_back(params.parse()?).await), // TODO(MultiSchema): we probably need to grab the namespaces from the params - RESET => render(executor.reset(None).await), + RESET => render(executor.reset().await), SCHEMA_PUSH => render(executor.schema_push(params.parse()?).await), other => unreachable!("Unknown command {}", other), } diff --git a/migration-engine/core/src/state.rs b/migration-engine/core/src/state.rs index e7a34e4ca237..df59a300b3a4 100644 --- a/migration-engine/core/src/state.rs +++ b/migration-engine/core/src/state.rs @@ -19,7 +19,7 @@ use tracing_futures::Instrument; /// channels. That ensures that each connector is handling requests one at a time to avoid /// synchronization issues. You can think of it in terms of the actor model. pub(crate) struct EngineState { - initial_datamodel: Option, + initial_datamodel: Option, host: Arc, // A map from either: // @@ -46,7 +46,7 @@ type ErasedConnectorRequest = Box< impl EngineState { pub(crate) fn new(initial_datamodel: Option, host: Option>) -> Self { EngineState { - initial_datamodel, + initial_datamodel: initial_datamodel.map(|s| psl::validate(s.into())), host: host.unwrap_or_else(|| Arc::new(migration_connector::EmptyHost)), connectors: Default::default(), } @@ -170,7 +170,7 @@ impl EngineState { return Err(ConnectorError::from_msg("Missing --datamodel".to_owned())); }; - self.with_connector_for_schema(schema, None, f).await + self.with_connector_for_schema(schema.db.source(), None, f).await } } @@ -366,9 +366,18 @@ impl GenericApi for EngineState { .await } - async fn reset(&self, namespaces: Option) -> CoreResult<()> { + async fn reset(&self) -> CoreResult<()> { tracing::debug!("Resetting the database."); + let namespaces: Option = self + .initial_datamodel + .as_ref() + .and_then(|schema| schema.configuration.datasources.first()) + .and_then(|ds| { + let mut names = ds.namespaces.iter().map(|(ns, _)| ns.to_owned()).collect(); + Namespaces::from_vec(&mut names) + }); + self.with_default_connector(Box::new(move |connector| { Box::pin(MigrationConnector::reset(connector, false, namespaces).instrument(tracing::info_span!("Reset"))) })) diff --git a/migration-engine/migration-engine-tests/src/assertions.rs b/migration-engine/migration-engine-tests/src/assertions.rs index 770071c533f6..925eeb509633 100644 --- a/migration-engine/migration-engine-tests/src/assertions.rs +++ b/migration-engine/migration-engine-tests/src/assertions.rs @@ -71,6 +71,7 @@ impl SchemaAssertion { } } + #[track_caller] fn assert_error(&self, table_name: &str, positive: bool) -> ! { let method = if positive { "assert_table" diff --git a/migration-engine/migration-engine-tests/src/test_api.rs b/migration-engine/migration-engine-tests/src/test_api.rs index e172e321803f..760bd4f638be 100644 --- a/migration-engine/migration-engine-tests/src/test_api.rs +++ b/migration-engine/migration-engine-tests/src/test_api.rs @@ -1,7 +1,10 @@ pub use crate::assertions::{MigrationsAssertions, ResultSetExt, SchemaAssertion}; pub use expect_test::expect; -pub use migration_core::json_rpc::types::{ - DbExecuteDatasourceType, DbExecuteParams, DiffParams, DiffResult, SchemaContainer, UrlContainer, +pub use migration_core::{ + json_rpc::types::{ + DbExecuteDatasourceType, DbExecuteParams, DiffParams, DiffResult, SchemaContainer, UrlContainer, + }, + migration_connector::Namespaces, }; pub use test_macros::test_connector; pub use test_setup::{runtime::run_with_thread_local_runtime as tok, BitFlags, Capabilities, Tags}; @@ -10,7 +13,7 @@ use crate::{commands::*, multi_engine_test_api::TestApi as RootTestApi}; use migration_core::{ commands::diff, migration_connector::{ - BoxFuture, ConnectorHost, ConnectorResult, DiffTarget, MigrationConnector, MigrationPersistence, Namespaces, + BoxFuture, ConnectorHost, ConnectorResult, DiffTarget, MigrationConnector, MigrationPersistence, }, }; use psl::parser_database::SourceFile; diff --git a/migration-engine/migration-engine-tests/tests/migrations/reset_tests.rs b/migration-engine/migration-engine-tests/tests/migrations/reset_tests.rs index 8eebb8e47f06..814e309bf51c 100644 --- a/migration-engine/migration-engine-tests/tests/migrations/reset_tests.rs +++ b/migration-engine/migration-engine-tests/tests/migrations/reset_tests.rs @@ -93,3 +93,53 @@ fn reset_then_diagnostics_with_migrations_directory_works(api: TestApi) { .assert_has_table("Cat") .assert_has_table("_prisma_migrations"); } + +#[test_connector( + tags(Postgres), + exclude(CockroachDb), + namespaces("felines", "rodents"), + preview_features("multiSchema") +)] +fn multi_schema_reset(mut api: TestApi) { + let prisma_schema = format! { + r#" + {} + + generator js {{ + provider = "prisma-client-js" + previewFeatures = ["multiSchema"] + }} + + model Manul {{ + id Int @id + @@schema("felines") + }} + + model Capybara {{ + id Int @id + @@schema("rodents") + }} + "#, api.datasource_block_with(&[("schemas", r#"["felines", "rodents"]"#)]) + }; + api.schema_push(&prisma_schema) + .send() + .assert_green() + .assert_has_executed_steps(); + let namespaces = Namespaces::from_vec(&mut vec!["felines".into(), "rodents".into()]); + + api.assert_schema_with_namespaces(namespaces.clone()) + .assert_has_table("Manul") + .assert_has_table("Capybara"); + + api.reset().send_sync(namespaces.clone()); + + api.assert_schema_with_namespaces(namespaces) + .assert_has_no_table("Manul") + .assert_has_no_table("Capybara"); + + // Check that we can migrate from there. + api.schema_push(&prisma_schema) + .send() + .assert_green() + .assert_has_executed_steps(); +} diff --git a/migration-engine/qe-setup/src/mssql.rs b/migration-engine/qe-setup/src/mssql.rs index 0bd3f5fab6cc..f56e4c229495 100644 --- a/migration-engine/qe-setup/src/mssql.rs +++ b/migration-engine/qe-setup/src/mssql.rs @@ -22,7 +22,7 @@ pub(crate) async fn mssql_setup(url: String, prisma_schema: &str, db_schemas: &[ conn.raw_cmd(&format!("USE [{db_name}];")).await.unwrap(); } else { let api = migration_core::migration_api(Some(prisma_schema.to_owned()), None)?; - api.reset(None).await.ok(); + api.reset().await.ok(); // Without these, our poor connection gets deadlocks if other schemas // are modified while we introspect. let allow_snapshot_isolation = format!(