diff --git a/.envrc b/.envrc index 1e61120eb2df..6ee663500267 100644 --- a/.envrc +++ b/.envrc @@ -12,7 +12,6 @@ export PRISMA_BINARY_PATH=$(pwd)/target/release/query-engine ### QE config vars, set to testing values ### export SQLITE_MAX_VARIABLE_NUMBER=250000 # This must be in sync with the setting in the engineer build CLI export QUERY_BATCH_SIZE=10 # Max size of IN statement vars. -export CLOSED_TX_CLEANUP=2 # Time in seconds when a closed interactive transaction will be removed from the cache. ### QE test setup vars ### export LOG_LEVEL=trace diff --git a/query-engine/connector-test-kit-rs/query-engine-tests/tests/new/interactive_tx.rs b/query-engine/connector-test-kit-rs/query-engine-tests/tests/new/interactive_tx.rs index 048c606477dc..55c37f7ad6e5 100644 --- a/query-engine/connector-test-kit-rs/query-engine-tests/tests/new/interactive_tx.rs +++ b/query-engine/connector-test-kit-rs/query-engine-tests/tests/new/interactive_tx.rs @@ -1,7 +1,6 @@ use query_engine_tests::test_suite; use std::borrow::Cow; -/// Note that if cache expiration tests fail, make sure `CLOSED_TX_CLEANUP` is set correctly (low value like 2) from the .envrc. #[test_suite(schema(generic))] mod interactive_tx { use query_engine_tests::*; @@ -91,7 +90,7 @@ mod interactive_tx { assert_eq!(known_err.error_code, Cow::Borrowed("P2028")); assert!(known_err .message - .contains("A commit cannot be executed on a closed transaction.")); + .contains("A commit cannot be executed on an expired transaction")); // Try again let res = runner.commit_tx(tx_id).await?; @@ -101,7 +100,7 @@ mod interactive_tx { assert_eq!(known_err.error_code, Cow::Borrowed("P2028")); assert!(known_err .message - .contains("A commit cannot be executed on a closed transaction.")); + .contains("A commit cannot be executed on an expired transaction")); Ok(()) } @@ -287,7 +286,7 @@ mod interactive_tx { assert_eq!(known_err.error_code, Cow::Borrowed("P2028")); assert!(known_err .message - .contains("A commit cannot be executed on a closed transaction.")); + .contains("A commit cannot be executed on an expired transaction")); // Expect the state of the tx to be expired so the rollback should fail. let res = runner.rollback_tx(tx_id.clone()).await?; @@ -297,14 +296,14 @@ mod interactive_tx { assert_eq!(known_err.error_code, Cow::Borrowed("P2028")); assert!(known_err .message - .contains("A rollback cannot be executed on a closed transaction.")); + .contains("A rollback cannot be executed on an expired transaction")); // Expect the state of the tx to be expired so the query should fail. assert_error!( runner, r#"{ findManyTestModel { id } }"#, 2028, - "A query cannot be executed on a closed transaction." + "A query cannot be executed on an expired transaction" ); runner @@ -319,7 +318,7 @@ mod interactive_tx { .await? .assert_failure( 2028, - Some("A batch query cannot be executed on a closed transaction.".to_string()), + Some("A batch query cannot be executed on an expired transaction".to_string()), ); Ok(()) @@ -419,6 +418,154 @@ mod interactive_tx { Ok(()) } + + #[connector_test] + async fn double_commit(mut runner: Runner) -> TestResult<()> { + let tx_id = runner.start_tx(5000, 5000, None).await?; + runner.set_active_tx(tx_id.clone()); + + insta::assert_snapshot!( + run_query!(&runner, r#"mutation { createOneTestModel(data: { id: 1 }) { id }}"#), + @r###"{"data":{"createOneTestModel":{"id":1}}}"### + ); + + // First commit must be successful + let res = runner.commit_tx(tx_id.clone()).await?; + assert!(res.is_ok()); + + // Second commit + let res = runner.commit_tx(tx_id).await?; + assert!(res.is_err()); + + runner.clear_active_tx(); + + let error = res.err().unwrap(); + let known_err = error.as_known().unwrap(); + + assert_eq!(known_err.error_code, Cow::Borrowed("P2028")); + assert!(known_err + .message + .contains("A commit cannot be executed on a committed transaction")); + + // The first commit must have worked + insta::assert_snapshot!( + run_query!(&runner, r#"query { findManyTestModel { id field }}"#), + @r###"{"data":{"findManyTestModel":[{"id":1,"field":null}]}}"### + ); + + Ok(()) + } + + #[connector_test] + async fn double_rollback(mut runner: Runner) -> TestResult<()> { + let tx_id = runner.start_tx(5000, 5000, None).await?; + runner.set_active_tx(tx_id.clone()); + + insta::assert_snapshot!( + run_query!(&runner, r#"mutation { createOneTestModel(data: { id: 1 }) { id }}"#), + @r###"{"data":{"createOneTestModel":{"id":1}}}"### + ); + + // First rollback must be successful + let res = runner.rollback_tx(tx_id.clone()).await?; + assert!(res.is_ok()); + + // Second rollback must return error + let res = runner.rollback_tx(tx_id).await?; + assert!(res.is_err()); + + runner.clear_active_tx(); + + let error = res.err().unwrap(); + let known_err = error.as_known().unwrap(); + + assert_eq!(known_err.error_code, Cow::Borrowed("P2028")); + assert!(known_err + .message + .contains("A rollback cannot be executed on a transaction that was rolled back")); + + // Check that the rollback still worked + insta::assert_snapshot!( + run_query!(&runner, r#"query { findManyTestModel { id field }}"#), + @r###"{"data":{"findManyTestModel":[]}}"### + ); + + Ok(()) + } + + #[connector_test] + async fn commit_after_rollback(mut runner: Runner) -> TestResult<()> { + let tx_id = runner.start_tx(5000, 5000, None).await?; + runner.set_active_tx(tx_id.clone()); + + insta::assert_snapshot!( + run_query!(&runner, r#"mutation { createOneTestModel(data: { id: 1 }) { id }}"#), + @r###"{"data":{"createOneTestModel":{"id":1}}}"### + ); + + // Rollback must be successful + let res = runner.rollback_tx(tx_id.clone()).await?; + assert!(res.is_ok()); + + // Commit must fail + let res = runner.commit_tx(tx_id).await?; + assert!(res.is_err()); + + runner.clear_active_tx(); + + let error = res.err().unwrap(); + let known_err = error.as_known().unwrap(); + + assert_eq!(known_err.error_code, Cow::Borrowed("P2028")); + assert!(known_err + .message + .contains("A commit cannot be executed on a transaction that was rolled back")); + + // Check that the commit didn't work + insta::assert_snapshot!( + run_query!(&runner, r#"query { findManyTestModel { id field }}"#), + @r###"{"data":{"findManyTestModel":[]}}"### + ); + + Ok(()) + } + + #[connector_test] + async fn rollback_after_commit(mut runner: Runner) -> TestResult<()> { + let tx_id = runner.start_tx(5000, 5000, None).await?; + runner.set_active_tx(tx_id.clone()); + + insta::assert_snapshot!( + run_query!(&runner, r#"mutation { createOneTestModel(data: { id: 1 }) { id }}"#), + @r###"{"data":{"createOneTestModel":{"id":1}}}"### + ); + + // Commit must be successful + let res = runner.commit_tx(tx_id.clone()).await?; + assert!(res.is_ok()); + + // Rollback must fail + let res = runner.rollback_tx(tx_id).await?; + assert!(res.is_err()); + + runner.clear_active_tx(); + + let error = res.err().unwrap(); + let known_err = error.as_known().unwrap(); + + assert_eq!(known_err.error_code, Cow::Borrowed("P2028")); + assert!(known_err + .message + .contains("A rollback cannot be executed on a committed transaction")); + + // Check that the commit worked + insta::assert_snapshot!( + run_query!(&runner, r#"query { findManyTestModel { id field }}"#), + @r###"{"data":{"findManyTestModel":[{"id":1,"field":null}]}}"### + ); + + Ok(()) + } } #[test_suite(schema(generic))] diff --git a/query-engine/core/src/interactive_transactions/actor_manager.rs b/query-engine/core/src/interactive_transactions/actor_manager.rs index 0388c522d487..7c950e556f64 100644 --- a/query-engine/core/src/interactive_transactions/actor_manager.rs +++ b/query-engine/core/src/interactive_transactions/actor_manager.rs @@ -1,4 +1,4 @@ -use crate::{Operation, ResponseData}; +use crate::{ClosedTx, Operation, ResponseData}; use lru::LruCache; use once_cell::sync::Lazy; use schema::QuerySchemaRef; @@ -26,9 +26,9 @@ pub struct TransactionActorManager { pub clients: Arc>>, /// Cache of closed transactions. We keep the last N closed transactions in memory to /// return better error messages if operations are performed on closed transactions. - pub closed_txs: Arc>>, + pub closed_txs: Arc>>>, /// Channel used to signal an ITx is closed and can be moved to the list of closed transactions. - send_done: Sender, + send_done: Sender<(TxId, Option)>, /// Handle to the task in charge of clearing actors. /// Used to abort the task when the TransactionActorManager is dropped. bg_reader_clear: JoinHandle<()>, @@ -49,10 +49,10 @@ impl Default for TransactionActorManager { impl TransactionActorManager { pub fn new() -> Self { - let clients: Arc>> = Arc::new(RwLock::new(HashMap::new())); - let closed_txs: Arc>> = Arc::new(RwLock::new(LruCache::new(*CLOSED_TX_CACHE_SIZE))); + let clients = Arc::new(RwLock::new(HashMap::new())); + let closed_txs = Arc::new(RwLock::new(LruCache::new(*CLOSED_TX_CACHE_SIZE))); - let (send_done, rx) = channel::(CHANNEL_SIZE); + let (send_done, rx) = channel(CHANNEL_SIZE); let handle = spawn_client_list_clear_actor(clients.clone(), closed_txs.clone(), rx); Self { @@ -79,9 +79,30 @@ impl TransactionActorManager { async fn get_client(&self, tx_id: &TxId, from_operation: &str) -> crate::Result { if let Some(client) = self.clients.read().await.get(tx_id) { Ok(client.clone()) - } else if self.closed_txs.read().await.contains(tx_id) { + } else if let Some(closed_tx) = self.closed_txs.read().await.peek(tx_id) { Err(TransactionError::Closed { - reason: format!("A {from_operation} cannot be executed on a closed transaction."), + reason: match closed_tx { + Some(ClosedTx::Committed) => { + format!("A {from_operation} cannot be executed on a committed transaction") + } + Some(ClosedTx::RolledBack) => { + format!("A {from_operation} cannot be executed on a transaction that was rolled back") + } + Some(ClosedTx::Expired { start_time, timeout }) => { + format!( + "A {from_operation} cannot be executed on an expired transaction. \ + The timeout for this transaction was {} ms, however {} ms passed since the start \ + of the transaction. Consider increasing the interactive transaction timeout \ + or doing less work in the transaction", + timeout.as_millis(), + start_time.elapsed().as_millis(), + ) + } + None => { + error!("[{tx_id}] no details about closed transaction"); + format!("A {from_operation} cannot be executed on a closed transaction") + } + }, } .into()) } else { diff --git a/query-engine/core/src/interactive_transactions/actors.rs b/query-engine/core/src/interactive_transactions/actors.rs index bbdc79cc3fef..532cf0df16a7 100644 --- a/query-engine/core/src/interactive_transactions/actors.rs +++ b/query-engine/core/src/interactive_transactions/actors.rs @@ -1,7 +1,7 @@ use super::{CachedTx, TransactionError, TxOpRequest, TxOpRequestMsg, TxOpResponse}; use crate::{ - execute_many_operations, execute_single_operation, set_span_link_from_trace_id, OpenTx, Operation, ResponseData, - TxId, + execute_many_operations, execute_single_operation, set_span_link_from_trace_id, ClosedTx, OpenTx, Operation, + ResponseData, TxId, }; use schema::QuerySchemaRef; use std::{collections::HashMap, sync::Arc}; @@ -11,7 +11,7 @@ use tokio::{ oneshot, RwLock, }, task::JoinHandle, - time::{self, Duration}, + time::{self, Duration, Instant}, }; use tracing::Span; use tracing_futures::Instrument; @@ -249,7 +249,7 @@ pub fn spawn_itx_actor( value: OpenTx, timeout: Duration, channel_size: usize, - send_done: Sender, + send_done: Sender<(TxId, Option)>, ) -> ITXClient { let (tx_to_server, rx_from_client) = channel::(channel_size); @@ -273,6 +273,7 @@ pub fn spawn_itx_actor( tokio::task::spawn( crate::executor::with_request_now(async move { + let start_time = Instant::now(); let sleep = time::sleep(timeout); tokio::pin!(sleep); @@ -297,7 +298,12 @@ pub fn spawn_itx_actor( trace!("[{}] completed with {}", server.id.to_string(), server.cached_tx); - let _ = send_done.send(server.id.clone()).await; + let _ = send_done + .send(( + server.id.clone(), + server.cached_tx.to_closed(start_time, server.timeout), + )) + .await; trace!("[{}] has stopped with {}", server.id.to_string(), server.cached_tx); }) @@ -351,19 +357,19 @@ pub fn spawn_itx_actor( */ pub fn spawn_client_list_clear_actor( clients: Arc>>, - closed_txs: Arc>>, - mut rx: Receiver, + closed_txs: Arc>>>, + mut rx: Receiver<(TxId, Option)>, ) -> JoinHandle<()> { tokio::task::spawn(async move { loop { - if let Some(id) = rx.recv().await { + if let Some((id, closed_tx)) = rx.recv().await { trace!("removing {} from client list", id); let mut clients_guard = clients.write().await; clients_guard.remove(&id); drop(clients_guard); - closed_txs.write().await.put(id, ()); + closed_txs.write().await.put(id, closed_tx); } } }) diff --git a/query-engine/core/src/interactive_transactions/error.rs b/query-engine/core/src/interactive_transactions/error.rs index 82a16edd8003..146d69f103b5 100644 --- a/query-engine/core/src/interactive_transactions/error.rs +++ b/query-engine/core/src/interactive_transactions/error.rs @@ -8,7 +8,7 @@ pub enum TransactionError { #[error("Attempted to start a transaction inside of a transaction.")] AlreadyStarted, - #[error("Transaction not found.")] + #[error("Transaction not found. Transaction ID is invalid, refers to an old closed transaction Prisma doesn't have information about anymore, or was obtained before disconnecting.")] NotFound, #[error("Transaction already closed: {reason}.")] diff --git a/query-engine/core/src/interactive_transactions/mod.rs b/query-engine/core/src/interactive_transactions/mod.rs index e648d4408937..453403a7df2a 100644 --- a/query-engine/core/src/interactive_transactions/mod.rs +++ b/query-engine/core/src/interactive_transactions/mod.rs @@ -1,7 +1,10 @@ use crate::CoreError; use connector::{Connection, ConnectionLike, Transaction}; use std::fmt::Display; -use tokio::task::JoinHandle; +use tokio::{ + task::JoinHandle, + time::{Duration, Instant}, +}; mod actor_manager; mod actors; @@ -62,7 +65,6 @@ impl Display for TxId { pub enum CachedTx { Open(OpenTx), - Aborted, Committed, RolledBack, Expired, @@ -72,7 +74,6 @@ impl Display for CachedTx { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { CachedTx::Open(_) => write!(f, "Open"), - CachedTx::Aborted => write!(f, "Aborted"), CachedTx::Committed => write!(f, "Committed"), CachedTx::RolledBack => write!(f, "Rolled back"), CachedTx::Expired => write!(f, "Expired"), @@ -101,6 +102,15 @@ impl CachedTx { Err(CoreError::from(TransactionError::Closed { reason })) } } + + pub fn to_closed(&self, start_time: Instant, timeout: Duration) -> Option { + match self { + CachedTx::Open(_) => None, + CachedTx::Committed => Some(ClosedTx::Committed), + CachedTx::RolledBack => Some(ClosedTx::RolledBack), + CachedTx::Expired => Some(ClosedTx::Expired { start_time, timeout }), + } + } } pub struct OpenTx { @@ -140,3 +150,9 @@ impl Into for OpenTx { CachedTx::Open(self) } } + +pub enum ClosedTx { + Committed, + RolledBack, + Expired { start_time: Instant, timeout: Duration }, +}