Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

me: implement multi-schema reset on postgres #3459

Merged
merged 2 commits into from
Dec 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 1 addition & 2 deletions libs/test-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,7 @@ async fn main() -> anyhow::Result<()> {
let schema = read_datamodel_from_file(&cmd.schema_path).context("Error reading the schema from file")?;
let api = migration_core::migration_api(Some(schema), None)?;

// TODO(MultiSchema): Perhaps read namespaces from the schema somehow.
api.reset(None).await?;
api.reset().await?;
}
Command::CreateDatabase(cmd) => {
let schema = read_datamodel_from_file(&cmd.schema_path).context("Error reading the schema from file")?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,12 @@ impl Namespaces {
}
}
}

impl IntoIterator for Namespaces {
type Item = String;
type IntoIter = std::iter::Chain<std::iter::Once<String>, <Vec<String> as IntoIterator>::IntoIter>;

fn into_iter(self) -> Self::IntoIter {
std::iter::once(self.0).chain(self.1.into_iter())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ pub(crate) trait SqlFlavour:
fn raw_cmd<'a>(&'a mut self, sql: &'a str) -> BoxFuture<'a, ConnectorResult<()>>;

/// Drop the database and recreate it empty.
fn reset(&mut self) -> BoxFuture<'_, ConnectorResult<()>>;
fn reset(&mut self, namespaces: Option<Namespaces>) -> BoxFuture<'_, ConnectorResult<()>>;

/// Optionally scan a migration script that could have been altered by users and emit warnings.
fn scan_migration_script(&self, _script: &str) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl SqlFlavour for MssqlFlavour {
})
}

fn reset(&mut self) -> BoxFuture<'_, ConnectorResult<()>> {
fn reset(&mut self, _namespaces: Option<Namespaces>) -> BoxFuture<'_, ConnectorResult<()>> {
with_connection(&mut self.state, move |params, connection| async move {
let schema_name = params.url.schema();

Expand Down Expand Up @@ -402,7 +402,7 @@ impl SqlFlavour for MssqlFlavour {
shadow_database.set_params(shadow_db_params)?;
shadow_database.ensure_connection_validity().await?;

if shadow_database.reset().await.is_err() {
if shadow_database.reset(None).await.is_err() {
crate::best_effort_reset(&mut shadow_database, namespaces).await?;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ impl SqlFlavour for MysqlFlavour {
})
}

fn reset(&mut self) -> BoxFuture<'_, ConnectorResult<()>> {
fn reset(&mut self, _namespaces: Option<Namespaces>) -> BoxFuture<'_, ConnectorResult<()>> {
with_connection(&mut self.state, move |params, circumstances, connection| async move {
if circumstances.contains(Circumstances::IsVitess) {
return Err(ConnectorError::from_msg(
Expand Down Expand Up @@ -307,7 +307,7 @@ impl SqlFlavour for MysqlFlavour {
shadow_database.ensure_connection_validity().await?;

tracing::info!("Connecting to user-provided shadow database.");
if shadow_database.reset().await.is_err() {
if shadow_database.reset(None).await.is_err() {
crate::best_effort_reset(&mut shadow_database, namespaces).await?;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use migration_connector::{
};
use quaint::connector::PostgresUrl;
use sql_schema_describer::SqlSchema;
use std::{collections::HashMap, future, time};
use std::{borrow::Cow, collections::HashMap, future, time};
use url::Url;
use user_facing_errors::{
common::{DatabaseAccessDenied, DatabaseDoesNotExist},
Expand Down Expand Up @@ -264,14 +264,27 @@ impl SqlFlavour for PostgresFlavour {
})
}

fn reset(&mut self) -> BoxFuture<'_, ConnectorResult<()>> {
fn reset(&mut self, namespaces: Option<Namespaces>) -> BoxFuture<'_, ConnectorResult<()>> {
with_connection(self, move |params, _circumstances, conn| async move {
let schema_name = params.url.schema();
let schemas_to_reset = match namespaces {
Some(ns) => ns.into_iter().map(Cow::Owned).collect(),
None => vec![Cow::Borrowed(params.url.schema())],
};

conn.raw_cmd(&format!("DROP SCHEMA \"{}\" CASCADE", schema_name), &params.url)
.await?;
conn.raw_cmd(&format!("CREATE SCHEMA \"{}\"", schema_name), &params.url)
.await?;
tracing::info!(?schemas_to_reset, "Resetting schema(s)");

for schema_name in schemas_to_reset {
conn.raw_cmd(&format!("DROP SCHEMA \"{}\" CASCADE", schema_name), &params.url)
.await?;
conn.raw_cmd(&format!("CREATE SCHEMA \"{}\"", schema_name), &params.url)
.await?;
}

// Drop the migrations table in the main schema, otherwise migrate dev will not
// perceive that as a reset, since migrations are still marked as applied.
//
// We don't care if this fails.
conn.raw_cmd("DROP TABLE _prisma_migrations", &params.url).await.ok();

Ok(())
})
Expand Down Expand Up @@ -342,7 +355,7 @@ impl SqlFlavour for PostgresFlavour {

tracing::info!("Connecting to user-provided shadow database.");

if shadow_database.reset().await.is_err() {
if shadow_database.reset(namespaces.clone()).await.is_err() {
crate::best_effort_reset(&mut shadow_database, namespaces.clone()).await?;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ impl SqlFlavour for SqliteFlavour {
ready(with_connection(&mut self.state, |_, conn| conn.raw_cmd(sql)))
}

fn reset(&mut self) -> BoxFuture<'_, ConnectorResult<()>> {
fn reset(&mut self, _namespaces: Option<Namespaces>) -> BoxFuture<'_, ConnectorResult<()>> {
ready(with_connection(&mut self.state, move |params, connection| {
let file_path = &params.file_path;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ impl MigrationConnector for SqlMigrationConnector {

fn reset(&mut self, soft: bool, namespaces: Option<Namespaces>) -> BoxFuture<'_, ConnectorResult<()>> {
Box::pin(async move {
if soft || self.flavour.reset().await.is_err() {
if soft || self.flavour.reset(namespaces.clone()).await.is_err() {
best_effort_reset(self.flavour.as_mut(), namespaces).await?;
}

Expand Down
4 changes: 1 addition & 3 deletions migration-engine/core/src/api.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
//! The external facing programmatic API to the migration engine.

use migration_connector::Namespaces;

use crate::{commands, json_rpc::types::*, CoreResult};

/// The programmatic, generic, fantastic migration engine API.
Expand Down Expand Up @@ -70,7 +68,7 @@ pub trait GenericApi: Send + Sync + 'static {
) -> CoreResult<MarkMigrationRolledBackOutput>;

/// Reset a database to an empty state (no data, no schema).
async fn reset(&self, namespaces: Option<Namespaces>) -> CoreResult<()>;
async fn reset(&self) -> CoreResult<()>;

/// The command behind `prisma db push`.
async fn schema_push(&self, input: SchemaPushInput) -> CoreResult<SchemaPushOutput>;
Expand Down
6 changes: 3 additions & 3 deletions migration-engine/core/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use jsonrpc_core::{types::error::Error as JsonRpcError, IoHandler, Params};
use std::sync::Arc;

/// Initialize a JSON-RPC ready migration engine API.
pub fn rpc_api(datamodel: Option<String>, host: Arc<dyn migration_connector::ConnectorHost>) -> IoHandler {
pub fn rpc_api(prisma_schema: Option<String>, host: Arc<dyn migration_connector::ConnectorHost>) -> IoHandler {
let mut io_handler = IoHandler::default();
let api = Arc::new(crate::state::EngineState::new(datamodel, Some(host)));
let api = Arc::new(crate::state::EngineState::new(prisma_schema, Some(host)));

for cmd in METHOD_NAMES {
let api = api.clone();
Expand Down Expand Up @@ -41,7 +41,7 @@ async fn run_command(
MARK_MIGRATION_APPLIED => render(executor.mark_migration_applied(params.parse()?).await),
MARK_MIGRATION_ROLLED_BACK => render(executor.mark_migration_rolled_back(params.parse()?).await),
// TODO(MultiSchema): we probably need to grab the namespaces from the params
RESET => render(executor.reset(None).await),
RESET => render(executor.reset().await),
SCHEMA_PUSH => render(executor.schema_push(params.parse()?).await),
other => unreachable!("Unknown command {}", other),
}
Expand Down
17 changes: 13 additions & 4 deletions migration-engine/core/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tracing_futures::Instrument;
/// channels. That ensures that each connector is handling requests one at a time to avoid
/// synchronization issues. You can think of it in terms of the actor model.
pub(crate) struct EngineState {
initial_datamodel: Option<String>,
initial_datamodel: Option<psl::ValidatedSchema>,
host: Arc<dyn ConnectorHost>,
// A map from either:
//
Expand All @@ -46,7 +46,7 @@ type ErasedConnectorRequest = Box<
impl EngineState {
pub(crate) fn new(initial_datamodel: Option<String>, host: Option<Arc<dyn ConnectorHost>>) -> Self {
EngineState {
initial_datamodel,
initial_datamodel: initial_datamodel.map(|s| psl::validate(s.into())),
host: host.unwrap_or_else(|| Arc::new(migration_connector::EmptyHost)),
connectors: Default::default(),
}
Expand Down Expand Up @@ -170,7 +170,7 @@ impl EngineState {
return Err(ConnectorError::from_msg("Missing --datamodel".to_owned()));
};

self.with_connector_for_schema(schema, None, f).await
self.with_connector_for_schema(schema.db.source(), None, f).await
}
}

Expand Down Expand Up @@ -366,9 +366,18 @@ impl GenericApi for EngineState {
.await
}

async fn reset(&self, namespaces: Option<Namespaces>) -> CoreResult<()> {
async fn reset(&self) -> CoreResult<()> {
tracing::debug!("Resetting the database.");

let namespaces: Option<Namespaces> = self
.initial_datamodel
.as_ref()
.and_then(|schema| schema.configuration.datasources.first())
.and_then(|ds| {
let mut names = ds.namespaces.iter().map(|(ns, _)| ns.to_owned()).collect();
Namespaces::from_vec(&mut names)
});

self.with_default_connector(Box::new(move |connector| {
Box::pin(MigrationConnector::reset(connector, false, namespaces).instrument(tracing::info_span!("Reset")))
}))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ impl SchemaAssertion {
}
}

#[track_caller]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This puts the error on the right line in the test, instead of inside the assertion function.

fn assert_error(&self, table_name: &str, positive: bool) -> ! {
let method = if positive {
"assert_table"
Expand Down
9 changes: 6 additions & 3 deletions migration-engine/migration-engine-tests/src/test_api.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
pub use crate::assertions::{MigrationsAssertions, ResultSetExt, SchemaAssertion};
pub use expect_test::expect;
pub use migration_core::json_rpc::types::{
DbExecuteDatasourceType, DbExecuteParams, DiffParams, DiffResult, SchemaContainer, UrlContainer,
pub use migration_core::{
json_rpc::types::{
DbExecuteDatasourceType, DbExecuteParams, DiffParams, DiffResult, SchemaContainer, UrlContainer,
},
migration_connector::Namespaces,
};
pub use test_macros::test_connector;
pub use test_setup::{runtime::run_with_thread_local_runtime as tok, BitFlags, Capabilities, Tags};
Expand All @@ -10,7 +13,7 @@ use crate::{commands::*, multi_engine_test_api::TestApi as RootTestApi};
use migration_core::{
commands::diff,
migration_connector::{
BoxFuture, ConnectorHost, ConnectorResult, DiffTarget, MigrationConnector, MigrationPersistence, Namespaces,
BoxFuture, ConnectorHost, ConnectorResult, DiffTarget, MigrationConnector, MigrationPersistence,
},
};
use psl::parser_database::SourceFile;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,63 @@ fn reset_then_diagnostics_with_migrations_directory_works(api: TestApi) {
.assert_has_table("Cat")
.assert_has_table("_prisma_migrations");
}

#[test_connector(
tags(Postgres),
exclude(CockroachDb),
namespaces("felines", "rodents"),
preview_features("multiSchema")
)]
fn multi_schema_reset(mut api: TestApi) {
let prisma_schema = format! {
r#"
{}

generator js {{
provider = "prisma-client-js"
previewFeatures = ["multiSchema"]
}}

model Manul {{
id Int @id
@@schema("felines")
}}

model Capybara {{
id Int @id
@@schema("rodents")
}}
"#, api.datasource_block_with(&[("schemas", r#"["felines", "rodents"]"#)])
};
api.schema_push(&prisma_schema)
.send()
.assert_green()
.assert_has_executed_steps();

let migrations_dir = api.create_migrations_directory();
api.apply_migrations(&migrations_dir).send_sync();
api.raw_cmd("CREATE TABLE randomTable (id INTEGER PRIMARY KEY);");

let all_namespaces = Namespaces::from_vec(&mut vec!["felines".into(), "rodents".into(), api.schema_name()]);
let namespaces_in_psl = Namespaces::from_vec(&mut vec!["felines".into(), "rodents".into()]);

api.assert_schema_with_namespaces(all_namespaces.clone())
.assert_has_table("randomtable")
.assert_has_table("_prisma_migrations")
.assert_has_table("Manul")
.assert_has_table("Capybara");

api.reset().send_sync(namespaces_in_psl);

api.assert_schema_with_namespaces(all_namespaces)
.assert_has_table("randomtable") // we do not want to wipe the schema from search_path
.assert_has_no_table("_prisma_migrations")
.assert_has_no_table("Manul")
.assert_has_no_table("Capybara");

// Check that we can migrate from there.
api.schema_push(&prisma_schema)
.send()
.assert_green()
.assert_has_executed_steps();
}
2 changes: 1 addition & 1 deletion migration-engine/qe-setup/src/mssql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub(crate) async fn mssql_setup(url: String, prisma_schema: &str, db_schemas: &[
conn.raw_cmd(&format!("USE [{db_name}];")).await.unwrap();
} else {
let api = migration_core::migration_api(Some(prisma_schema.to_owned()), None)?;
api.reset(None).await.ok();
api.reset().await.ok();
// Without these, our poor connection gets deadlocks if other schemas
// are modified while we introspect.
let allow_snapshot_isolation = format!(
Expand Down