Skip to content

Commit

Permalink
me: implement multi-schema reset on postgres
Browse files Browse the repository at this point in the history
Before this commit, the migration engine `reset` command resets the
schema in the search path, on postgresql and cockroachdb.

This is not the expected behaviour when working with multiple schemas
defined in the `schemas` datasource property: users expect all schemas
to be reset, moreover migrate dev will be broken if we do not do that,
because it expects reset to do its job before re-applying migrations.

In this commit, we take into account the namespaces defined in the
Prisma schema the migration engine is initialized with. We reset these
schemas, when defined, *in addition to* the schema in the search path.
The reason we still the search path into account is because that is
where we create and expect the _prisma_migrations table to live. I
expect we may want to revisit that design choice
(issue: prisma/prisma#16565).

closes prisma/prisma#16561
  • Loading branch information
tomhoule committed Dec 1, 2022
1 parent 9527a31 commit a1e9ef6
Show file tree
Hide file tree
Showing 15 changed files with 111 additions and 31 deletions.
3 changes: 1 addition & 2 deletions libs/test-cli/src/main.rs
Expand Up @@ -236,8 +236,7 @@ async fn main() -> anyhow::Result<()> {
let schema = read_datamodel_from_file(&cmd.schema_path).context("Error reading the schema from file")?;
let api = migration_core::migration_api(Some(schema), None)?;

// TODO(MultiSchema): Perhaps read namespaces from the schema somehow.
api.reset(None).await?;
api.reset().await?;
}
Command::CreateDatabase(cmd) => {
let schema = read_datamodel_from_file(&cmd.schema_path).context("Error reading the schema from file")?;
Expand Down
Expand Up @@ -28,3 +28,12 @@ impl Namespaces {
}
}
}

impl IntoIterator for Namespaces {
type Item = String;
type IntoIter = std::iter::Chain<std::iter::Once<String>, <Vec<String> as IntoIterator>::IntoIter>;

fn into_iter(self) -> Self::IntoIter {
std::iter::once(self.0).chain(self.1.into_iter())
}
}
Expand Up @@ -241,7 +241,7 @@ pub(crate) trait SqlFlavour:
fn raw_cmd<'a>(&'a mut self, sql: &'a str) -> BoxFuture<'a, ConnectorResult<()>>;

/// Drop the database and recreate it empty.
fn reset(&mut self) -> BoxFuture<'_, ConnectorResult<()>>;
fn reset(&mut self, namespaces: Option<Namespaces>) -> BoxFuture<'_, ConnectorResult<()>>;

/// Optionally scan a migration script that could have been altered by users and emit warnings.
fn scan_migration_script(&self, _script: &str) {}
Expand Down
Expand Up @@ -216,7 +216,7 @@ impl SqlFlavour for MssqlFlavour {
})
}

fn reset(&mut self) -> BoxFuture<'_, ConnectorResult<()>> {
fn reset(&mut self, _namespaces: Option<Namespaces>) -> BoxFuture<'_, ConnectorResult<()>> {
with_connection(&mut self.state, move |params, connection| async move {
let schema_name = params.url.schema();

Expand Down Expand Up @@ -402,7 +402,7 @@ impl SqlFlavour for MssqlFlavour {
shadow_database.set_params(shadow_db_params)?;
shadow_database.ensure_connection_validity().await?;

if shadow_database.reset().await.is_err() {
if shadow_database.reset(None).await.is_err() {
crate::best_effort_reset(&mut shadow_database, namespaces).await?;
}

Expand Down
Expand Up @@ -231,7 +231,7 @@ impl SqlFlavour for MysqlFlavour {
})
}

fn reset(&mut self) -> BoxFuture<'_, ConnectorResult<()>> {
fn reset(&mut self, _namespaces: Option<Namespaces>) -> BoxFuture<'_, ConnectorResult<()>> {
with_connection(&mut self.state, move |params, circumstances, connection| async move {
if circumstances.contains(Circumstances::IsVitess) {
return Err(ConnectorError::from_msg(
Expand Down Expand Up @@ -307,7 +307,7 @@ impl SqlFlavour for MysqlFlavour {
shadow_database.ensure_connection_validity().await?;

tracing::info!("Connecting to user-provided shadow database.");
if shadow_database.reset().await.is_err() {
if shadow_database.reset(None).await.is_err() {
crate::best_effort_reset(&mut shadow_database, namespaces).await?;
}

Expand Down
Expand Up @@ -10,7 +10,7 @@ use migration_connector::{
};
use quaint::connector::PostgresUrl;
use sql_schema_describer::SqlSchema;
use std::{collections::HashMap, future, time};
use std::{borrow::Cow, collections::HashMap, future, time};
use url::Url;
use user_facing_errors::{
common::{DatabaseAccessDenied, DatabaseDoesNotExist},
Expand Down Expand Up @@ -264,14 +264,25 @@ impl SqlFlavour for PostgresFlavour {
})
}

fn reset(&mut self) -> BoxFuture<'_, ConnectorResult<()>> {
fn reset(&mut self, namespaces: Option<Namespaces>) -> BoxFuture<'_, ConnectorResult<()>> {
with_connection(self, move |params, _circumstances, conn| async move {
let schema_name = params.url.schema();
let mut schemas = vec![Cow::Borrowed(params.url.schema())];

conn.raw_cmd(&format!("DROP SCHEMA \"{}\" CASCADE", schema_name), &params.url)
.await?;
conn.raw_cmd(&format!("CREATE SCHEMA \"{}\"", schema_name), &params.url)
.await?;
// We reset the namespaces defined in the `schemas` datasource property _in addition to
// the schema in the search path_, because the search path is where we create the
// migrations table.
for namespace in namespaces.into_iter().flatten() {
schemas.push(Cow::Owned(namespace))
}

tracing::info!(?schemas, "Resetting schema(s)");

for schema_name in schemas {
conn.raw_cmd(&format!("DROP SCHEMA \"{}\" CASCADE", schema_name), &params.url)
.await?;
conn.raw_cmd(&format!("CREATE SCHEMA \"{}\"", schema_name), &params.url)
.await?;
}

Ok(())
})
Expand Down Expand Up @@ -342,7 +353,7 @@ impl SqlFlavour for PostgresFlavour {

tracing::info!("Connecting to user-provided shadow database.");

if shadow_database.reset().await.is_err() {
if shadow_database.reset(namespaces.clone()).await.is_err() {
crate::best_effort_reset(&mut shadow_database, namespaces.clone()).await?;
}

Expand Down
Expand Up @@ -245,7 +245,7 @@ impl SqlFlavour for SqliteFlavour {
ready(with_connection(&mut self.state, |_, conn| conn.raw_cmd(sql)))
}

fn reset(&mut self) -> BoxFuture<'_, ConnectorResult<()>> {
fn reset(&mut self, _namespaces: Option<Namespaces>) -> BoxFuture<'_, ConnectorResult<()>> {
ready(with_connection(&mut self.state, move |params, connection| {
let file_path = &params.file_path;

Expand Down
Expand Up @@ -262,7 +262,7 @@ impl MigrationConnector for SqlMigrationConnector {

fn reset(&mut self, soft: bool, namespaces: Option<Namespaces>) -> BoxFuture<'_, ConnectorResult<()>> {
Box::pin(async move {
if soft || self.flavour.reset().await.is_err() {
if soft || self.flavour.reset(namespaces.clone()).await.is_err() {
best_effort_reset(self.flavour.as_mut(), namespaces).await?;
}

Expand Down
4 changes: 1 addition & 3 deletions migration-engine/core/src/api.rs
@@ -1,7 +1,5 @@
//! The external facing programmatic API to the migration engine.

use migration_connector::Namespaces;

use crate::{commands, json_rpc::types::*, CoreResult};

/// The programmatic, generic, fantastic migration engine API.
Expand Down Expand Up @@ -70,7 +68,7 @@ pub trait GenericApi: Send + Sync + 'static {
) -> CoreResult<MarkMigrationRolledBackOutput>;

/// Reset a database to an empty state (no data, no schema).
async fn reset(&self, namespaces: Option<Namespaces>) -> CoreResult<()>;
async fn reset(&self) -> CoreResult<()>;

/// The command behind `prisma db push`.
async fn schema_push(&self, input: SchemaPushInput) -> CoreResult<SchemaPushOutput>;
Expand Down
6 changes: 3 additions & 3 deletions migration-engine/core/src/rpc.rs
Expand Up @@ -3,9 +3,9 @@ use jsonrpc_core::{types::error::Error as JsonRpcError, IoHandler, Params};
use std::sync::Arc;

/// Initialize a JSON-RPC ready migration engine API.
pub fn rpc_api(datamodel: Option<String>, host: Arc<dyn migration_connector::ConnectorHost>) -> IoHandler {
pub fn rpc_api(prisma_schema: Option<String>, host: Arc<dyn migration_connector::ConnectorHost>) -> IoHandler {
let mut io_handler = IoHandler::default();
let api = Arc::new(crate::state::EngineState::new(datamodel, Some(host)));
let api = Arc::new(crate::state::EngineState::new(prisma_schema, Some(host)));

for cmd in METHOD_NAMES {
let api = api.clone();
Expand Down Expand Up @@ -41,7 +41,7 @@ async fn run_command(
MARK_MIGRATION_APPLIED => render(executor.mark_migration_applied(params.parse()?).await),
MARK_MIGRATION_ROLLED_BACK => render(executor.mark_migration_rolled_back(params.parse()?).await),
// TODO(MultiSchema): we probably need to grab the namespaces from the params
RESET => render(executor.reset(None).await),
RESET => render(executor.reset().await),
SCHEMA_PUSH => render(executor.schema_push(params.parse()?).await),
other => unreachable!("Unknown command {}", other),
}
Expand Down
17 changes: 13 additions & 4 deletions migration-engine/core/src/state.rs
Expand Up @@ -19,7 +19,7 @@ use tracing_futures::Instrument;
/// channels. That ensures that each connector is handling requests one at a time to avoid
/// synchronization issues. You can think of it in terms of the actor model.
pub(crate) struct EngineState {
initial_datamodel: Option<String>,
initial_datamodel: Option<psl::ValidatedSchema>,
host: Arc<dyn ConnectorHost>,
// A map from either:
//
Expand All @@ -46,7 +46,7 @@ type ErasedConnectorRequest = Box<
impl EngineState {
pub(crate) fn new(initial_datamodel: Option<String>, host: Option<Arc<dyn ConnectorHost>>) -> Self {
EngineState {
initial_datamodel,
initial_datamodel: initial_datamodel.map(|s| psl::validate(s.into())),
host: host.unwrap_or_else(|| Arc::new(migration_connector::EmptyHost)),
connectors: Default::default(),
}
Expand Down Expand Up @@ -170,7 +170,7 @@ impl EngineState {
return Err(ConnectorError::from_msg("Missing --datamodel".to_owned()));
};

self.with_connector_for_schema(schema, None, f).await
self.with_connector_for_schema(schema.db.source(), None, f).await
}
}

Expand Down Expand Up @@ -366,9 +366,18 @@ impl GenericApi for EngineState {
.await
}

async fn reset(&self, namespaces: Option<Namespaces>) -> CoreResult<()> {
async fn reset(&self) -> CoreResult<()> {
tracing::debug!("Resetting the database.");

let namespaces: Option<Namespaces> = self
.initial_datamodel
.as_ref()
.and_then(|schema| schema.configuration.datasources.first())
.and_then(|ds| {
let mut names = ds.namespaces.iter().map(|(ns, _)| ns.to_owned()).collect();
Namespaces::from_vec(&mut names)
});

self.with_default_connector(Box::new(move |connector| {
Box::pin(MigrationConnector::reset(connector, false, namespaces).instrument(tracing::info_span!("Reset")))
}))
Expand Down
1 change: 1 addition & 0 deletions migration-engine/migration-engine-tests/src/assertions.rs
Expand Up @@ -71,6 +71,7 @@ impl SchemaAssertion {
}
}

#[track_caller]
fn assert_error(&self, table_name: &str, positive: bool) -> ! {
let method = if positive {
"assert_table"
Expand Down
9 changes: 6 additions & 3 deletions migration-engine/migration-engine-tests/src/test_api.rs
@@ -1,7 +1,10 @@
pub use crate::assertions::{MigrationsAssertions, ResultSetExt, SchemaAssertion};
pub use expect_test::expect;
pub use migration_core::json_rpc::types::{
DbExecuteDatasourceType, DbExecuteParams, DiffParams, DiffResult, SchemaContainer, UrlContainer,
pub use migration_core::{
json_rpc::types::{
DbExecuteDatasourceType, DbExecuteParams, DiffParams, DiffResult, SchemaContainer, UrlContainer,
},
migration_connector::Namespaces,
};
pub use test_macros::test_connector;
pub use test_setup::{runtime::run_with_thread_local_runtime as tok, BitFlags, Capabilities, Tags};
Expand All @@ -10,7 +13,7 @@ use crate::{commands::*, multi_engine_test_api::TestApi as RootTestApi};
use migration_core::{
commands::diff,
migration_connector::{
BoxFuture, ConnectorHost, ConnectorResult, DiffTarget, MigrationConnector, MigrationPersistence, Namespaces,
BoxFuture, ConnectorHost, ConnectorResult, DiffTarget, MigrationConnector, MigrationPersistence,
},
};
use psl::parser_database::SourceFile;
Expand Down
Expand Up @@ -93,3 +93,53 @@ fn reset_then_diagnostics_with_migrations_directory_works(api: TestApi) {
.assert_has_table("Cat")
.assert_has_table("_prisma_migrations");
}

#[test_connector(
tags(Postgres),
exclude(CockroachDb),
namespaces("felines", "rodents"),
preview_features("multiSchema")
)]
fn multi_schema_reset(mut api: TestApi) {
let prisma_schema = format! {
r#"
{}
generator js {{
provider = "prisma-client-js"
previewFeatures = ["multiSchema"]
}}
model Manul {{
id Int @id
@@schema("felines")
}}
model Capybara {{
id Int @id
@@schema("rodents")
}}
"#, api.datasource_block_with(&[("schemas", r#"["felines", "rodents"]"#)])
};
api.schema_push(&prisma_schema)
.send()
.assert_green()
.assert_has_executed_steps();
let namespaces = Namespaces::from_vec(&mut vec!["felines".into(), "rodents".into()]);

api.assert_schema_with_namespaces(namespaces.clone())
.assert_has_table("Manul")
.assert_has_table("Capybara");

api.reset().send_sync(namespaces.clone());

api.assert_schema_with_namespaces(namespaces)
.assert_has_no_table("Manul")
.assert_has_no_table("Capybara");

// Check that we can migrate from there.
api.schema_push(&prisma_schema)
.send()
.assert_green()
.assert_has_executed_steps();
}
2 changes: 1 addition & 1 deletion migration-engine/qe-setup/src/mssql.rs
Expand Up @@ -22,7 +22,7 @@ pub(crate) async fn mssql_setup(url: String, prisma_schema: &str, db_schemas: &[
conn.raw_cmd(&format!("USE [{db_name}];")).await.unwrap();
} else {
let api = migration_core::migration_api(Some(prisma_schema.to_owned()), None)?;
api.reset(None).await.ok();
api.reset().await.ok();
// Without these, our poor connection gets deadlocks if other schemas
// are modified while we introspect.
let allow_snapshot_isolation = format!(
Expand Down

0 comments on commit a1e9ef6

Please sign in to comment.