Skip to content

Commit

Permalink
qe: improve error messages for closed transactions (#3391)
Browse files Browse the repository at this point in the history
Make the error messages for closed transactions more detailed, add extra context about the transaction timeout.

  Before:

  - `Transaction API error: Transaction already closed: A query/commit/rollback cannot be executed on a closed transaction..`
  (also note the double "." at the end)

  Now:

  - `Transaction API error: Transaction already closed: A query/commit/rollback cannot be executed on a committed transaction.`
  - `Transaction API error: Transaction already closed:  A query/commit/rollback cannot be executed on a transaction that was rolled back.`
  - `Transaction API error: Transaction already closed:  A query/commit/rollback cannot be executed on an expired transaction. The timeout for this transaction was X ms, however Y ms passed since the start of the transaction. Consider increasing the interactive transaction timeout or doing less work in the transaction.`

Additionally, the "Transaction not found error" is now also more verbose, ref: https://www.notion.so/disconnect-with-iTX-f3cfee3ff4924e40aa90aadb2454e9fa?d=3bd7c7103b02461bbfe414a978a994c1#547ab127682b41898c87bdd5c841c0bf

Also contains minor cleanup things related to iTX:

* Remove the unused `CachedTx::Aborted` variant
* Remove references to an obsolete env var that doesn't exist since #3028 from comments and `.envrc`

Client PR: prisma/prisma#16382

Closes: prisma/prisma#13713
Ref: prisma/prisma#16050
Ref: #3028
  • Loading branch information
aqrln committed Nov 23, 2022
1 parent fe21a0f commit fb37317
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 29 deletions.
1 change: 0 additions & 1 deletion .envrc
Expand Up @@ -12,7 +12,6 @@ export PRISMA_BINARY_PATH=$(pwd)/target/release/query-engine
### QE config vars, set to testing values ###
export SQLITE_MAX_VARIABLE_NUMBER=250000 # This must be in sync with the setting in the engineer build CLI
export QUERY_BATCH_SIZE=10 # Max size of IN statement vars.
export CLOSED_TX_CLEANUP=2 # Time in seconds when a closed interactive transaction will be removed from the cache.

### QE test setup vars ###
export LOG_LEVEL=trace
Expand Down
@@ -1,7 +1,6 @@
use query_engine_tests::test_suite;
use std::borrow::Cow;

/// Note that if cache expiration tests fail, make sure `CLOSED_TX_CLEANUP` is set correctly (low value like 2) from the .envrc.
#[test_suite(schema(generic))]
mod interactive_tx {
use query_engine_tests::*;
Expand Down Expand Up @@ -91,7 +90,7 @@ mod interactive_tx {
assert_eq!(known_err.error_code, Cow::Borrowed("P2028"));
assert!(known_err
.message
.contains("A commit cannot be executed on a closed transaction."));
.contains("A commit cannot be executed on an expired transaction"));

// Try again
let res = runner.commit_tx(tx_id).await?;
Expand All @@ -101,7 +100,7 @@ mod interactive_tx {
assert_eq!(known_err.error_code, Cow::Borrowed("P2028"));
assert!(known_err
.message
.contains("A commit cannot be executed on a closed transaction."));
.contains("A commit cannot be executed on an expired transaction"));

Ok(())
}
Expand Down Expand Up @@ -287,7 +286,7 @@ mod interactive_tx {
assert_eq!(known_err.error_code, Cow::Borrowed("P2028"));
assert!(known_err
.message
.contains("A commit cannot be executed on a closed transaction."));
.contains("A commit cannot be executed on an expired transaction"));

// Expect the state of the tx to be expired so the rollback should fail.
let res = runner.rollback_tx(tx_id.clone()).await?;
Expand All @@ -297,14 +296,14 @@ mod interactive_tx {
assert_eq!(known_err.error_code, Cow::Borrowed("P2028"));
assert!(known_err
.message
.contains("A rollback cannot be executed on a closed transaction."));
.contains("A rollback cannot be executed on an expired transaction"));

// Expect the state of the tx to be expired so the query should fail.
assert_error!(
runner,
r#"{ findManyTestModel { id } }"#,
2028,
"A query cannot be executed on a closed transaction."
"A query cannot be executed on an expired transaction"
);

runner
Expand All @@ -319,7 +318,7 @@ mod interactive_tx {
.await?
.assert_failure(
2028,
Some("A batch query cannot be executed on a closed transaction.".to_string()),
Some("A batch query cannot be executed on an expired transaction".to_string()),
);

Ok(())
Expand Down Expand Up @@ -419,6 +418,154 @@ mod interactive_tx {

Ok(())
}

#[connector_test]
async fn double_commit(mut runner: Runner) -> TestResult<()> {
let tx_id = runner.start_tx(5000, 5000, None).await?;
runner.set_active_tx(tx_id.clone());

insta::assert_snapshot!(
run_query!(&runner, r#"mutation { createOneTestModel(data: { id: 1 }) { id }}"#),
@r###"{"data":{"createOneTestModel":{"id":1}}}"###
);

// First commit must be successful
let res = runner.commit_tx(tx_id.clone()).await?;
assert!(res.is_ok());

// Second commit
let res = runner.commit_tx(tx_id).await?;
assert!(res.is_err());

runner.clear_active_tx();

let error = res.err().unwrap();
let known_err = error.as_known().unwrap();

assert_eq!(known_err.error_code, Cow::Borrowed("P2028"));
assert!(known_err
.message
.contains("A commit cannot be executed on a committed transaction"));

// The first commit must have worked
insta::assert_snapshot!(
run_query!(&runner, r#"query { findManyTestModel { id field }}"#),
@r###"{"data":{"findManyTestModel":[{"id":1,"field":null}]}}"###
);

Ok(())
}

#[connector_test]
async fn double_rollback(mut runner: Runner) -> TestResult<()> {
let tx_id = runner.start_tx(5000, 5000, None).await?;
runner.set_active_tx(tx_id.clone());

insta::assert_snapshot!(
run_query!(&runner, r#"mutation { createOneTestModel(data: { id: 1 }) { id }}"#),
@r###"{"data":{"createOneTestModel":{"id":1}}}"###
);

// First rollback must be successful
let res = runner.rollback_tx(tx_id.clone()).await?;
assert!(res.is_ok());

// Second rollback must return error
let res = runner.rollback_tx(tx_id).await?;
assert!(res.is_err());

runner.clear_active_tx();

let error = res.err().unwrap();
let known_err = error.as_known().unwrap();

assert_eq!(known_err.error_code, Cow::Borrowed("P2028"));
assert!(known_err
.message
.contains("A rollback cannot be executed on a transaction that was rolled back"));

// Check that the rollback still worked
insta::assert_snapshot!(
run_query!(&runner, r#"query { findManyTestModel { id field }}"#),
@r###"{"data":{"findManyTestModel":[]}}"###
);

Ok(())
}

#[connector_test]
async fn commit_after_rollback(mut runner: Runner) -> TestResult<()> {
let tx_id = runner.start_tx(5000, 5000, None).await?;
runner.set_active_tx(tx_id.clone());

insta::assert_snapshot!(
run_query!(&runner, r#"mutation { createOneTestModel(data: { id: 1 }) { id }}"#),
@r###"{"data":{"createOneTestModel":{"id":1}}}"###
);

// Rollback must be successful
let res = runner.rollback_tx(tx_id.clone()).await?;
assert!(res.is_ok());

// Commit must fail
let res = runner.commit_tx(tx_id).await?;
assert!(res.is_err());

runner.clear_active_tx();

let error = res.err().unwrap();
let known_err = error.as_known().unwrap();

assert_eq!(known_err.error_code, Cow::Borrowed("P2028"));
assert!(known_err
.message
.contains("A commit cannot be executed on a transaction that was rolled back"));

// Check that the commit didn't work
insta::assert_snapshot!(
run_query!(&runner, r#"query { findManyTestModel { id field }}"#),
@r###"{"data":{"findManyTestModel":[]}}"###
);

Ok(())
}

#[connector_test]
async fn rollback_after_commit(mut runner: Runner) -> TestResult<()> {
let tx_id = runner.start_tx(5000, 5000, None).await?;
runner.set_active_tx(tx_id.clone());

insta::assert_snapshot!(
run_query!(&runner, r#"mutation { createOneTestModel(data: { id: 1 }) { id }}"#),
@r###"{"data":{"createOneTestModel":{"id":1}}}"###
);

// Commit must be successful
let res = runner.commit_tx(tx_id.clone()).await?;
assert!(res.is_ok());

// Rollback must fail
let res = runner.rollback_tx(tx_id).await?;
assert!(res.is_err());

runner.clear_active_tx();

let error = res.err().unwrap();
let known_err = error.as_known().unwrap();

assert_eq!(known_err.error_code, Cow::Borrowed("P2028"));
assert!(known_err
.message
.contains("A rollback cannot be executed on a committed transaction"));

// Check that the commit worked
insta::assert_snapshot!(
run_query!(&runner, r#"query { findManyTestModel { id field }}"#),
@r###"{"data":{"findManyTestModel":[{"id":1,"field":null}]}}"###
);

Ok(())
}
}

#[test_suite(schema(generic))]
Expand Down
37 changes: 29 additions & 8 deletions query-engine/core/src/interactive_transactions/actor_manager.rs
@@ -1,4 +1,4 @@
use crate::{Operation, ResponseData};
use crate::{ClosedTx, Operation, ResponseData};
use lru::LruCache;
use once_cell::sync::Lazy;
use schema::QuerySchemaRef;
Expand Down Expand Up @@ -26,9 +26,9 @@ pub struct TransactionActorManager {
pub clients: Arc<RwLock<HashMap<TxId, ITXClient>>>,
/// Cache of closed transactions. We keep the last N closed transactions in memory to
/// return better error messages if operations are performed on closed transactions.
pub closed_txs: Arc<RwLock<LruCache<TxId, ()>>>,
pub closed_txs: Arc<RwLock<LruCache<TxId, Option<ClosedTx>>>>,
/// Channel used to signal an ITx is closed and can be moved to the list of closed transactions.
send_done: Sender<TxId>,
send_done: Sender<(TxId, Option<ClosedTx>)>,
/// Handle to the task in charge of clearing actors.
/// Used to abort the task when the TransactionActorManager is dropped.
bg_reader_clear: JoinHandle<()>,
Expand All @@ -49,10 +49,10 @@ impl Default for TransactionActorManager {

impl TransactionActorManager {
pub fn new() -> Self {
let clients: Arc<RwLock<HashMap<TxId, ITXClient>>> = Arc::new(RwLock::new(HashMap::new()));
let closed_txs: Arc<RwLock<LruCache<TxId, ()>>> = Arc::new(RwLock::new(LruCache::new(*CLOSED_TX_CACHE_SIZE)));
let clients = Arc::new(RwLock::new(HashMap::new()));
let closed_txs = Arc::new(RwLock::new(LruCache::new(*CLOSED_TX_CACHE_SIZE)));

let (send_done, rx) = channel::<TxId>(CHANNEL_SIZE);
let (send_done, rx) = channel(CHANNEL_SIZE);
let handle = spawn_client_list_clear_actor(clients.clone(), closed_txs.clone(), rx);

Self {
Expand All @@ -79,9 +79,30 @@ impl TransactionActorManager {
async fn get_client(&self, tx_id: &TxId, from_operation: &str) -> crate::Result<ITXClient> {
if let Some(client) = self.clients.read().await.get(tx_id) {
Ok(client.clone())
} else if self.closed_txs.read().await.contains(tx_id) {
} else if let Some(closed_tx) = self.closed_txs.read().await.peek(tx_id) {
Err(TransactionError::Closed {
reason: format!("A {from_operation} cannot be executed on a closed transaction."),
reason: match closed_tx {
Some(ClosedTx::Committed) => {
format!("A {from_operation} cannot be executed on a committed transaction")
}
Some(ClosedTx::RolledBack) => {
format!("A {from_operation} cannot be executed on a transaction that was rolled back")
}
Some(ClosedTx::Expired { start_time, timeout }) => {
format!(
"A {from_operation} cannot be executed on an expired transaction. \
The timeout for this transaction was {} ms, however {} ms passed since the start \
of the transaction. Consider increasing the interactive transaction timeout \
or doing less work in the transaction",
timeout.as_millis(),
start_time.elapsed().as_millis(),
)
}
None => {
error!("[{tx_id}] no details about closed transaction");
format!("A {from_operation} cannot be executed on a closed transaction")
}
},
}
.into())
} else {
Expand Down
24 changes: 15 additions & 9 deletions query-engine/core/src/interactive_transactions/actors.rs
@@ -1,7 +1,7 @@
use super::{CachedTx, TransactionError, TxOpRequest, TxOpRequestMsg, TxOpResponse};
use crate::{
execute_many_operations, execute_single_operation, set_span_link_from_trace_id, OpenTx, Operation, ResponseData,
TxId,
execute_many_operations, execute_single_operation, set_span_link_from_trace_id, ClosedTx, OpenTx, Operation,
ResponseData, TxId,
};
use schema::QuerySchemaRef;
use std::{collections::HashMap, sync::Arc};
Expand All @@ -11,7 +11,7 @@ use tokio::{
oneshot, RwLock,
},
task::JoinHandle,
time::{self, Duration},
time::{self, Duration, Instant},
};
use tracing::Span;
use tracing_futures::Instrument;
Expand Down Expand Up @@ -249,7 +249,7 @@ pub fn spawn_itx_actor(
value: OpenTx,
timeout: Duration,
channel_size: usize,
send_done: Sender<TxId>,
send_done: Sender<(TxId, Option<ClosedTx>)>,
) -> ITXClient {
let (tx_to_server, rx_from_client) = channel::<TxOpRequest>(channel_size);

Expand All @@ -273,6 +273,7 @@ pub fn spawn_itx_actor(

tokio::task::spawn(
crate::executor::with_request_now(async move {
let start_time = Instant::now();
let sleep = time::sleep(timeout);
tokio::pin!(sleep);

Expand All @@ -297,7 +298,12 @@ pub fn spawn_itx_actor(

trace!("[{}] completed with {}", server.id.to_string(), server.cached_tx);

let _ = send_done.send(server.id.clone()).await;
let _ = send_done
.send((
server.id.clone(),
server.cached_tx.to_closed(start_time, server.timeout),
))
.await;

trace!("[{}] has stopped with {}", server.id.to_string(), server.cached_tx);
})
Expand Down Expand Up @@ -351,19 +357,19 @@ pub fn spawn_itx_actor(
*/
pub fn spawn_client_list_clear_actor(
clients: Arc<RwLock<HashMap<TxId, ITXClient>>>,
closed_txs: Arc<RwLock<lru::LruCache<TxId, ()>>>,
mut rx: Receiver<TxId>,
closed_txs: Arc<RwLock<lru::LruCache<TxId, Option<ClosedTx>>>>,
mut rx: Receiver<(TxId, Option<ClosedTx>)>,
) -> JoinHandle<()> {
tokio::task::spawn(async move {
loop {
if let Some(id) = rx.recv().await {
if let Some((id, closed_tx)) = rx.recv().await {
trace!("removing {} from client list", id);

let mut clients_guard = clients.write().await;
clients_guard.remove(&id);
drop(clients_guard);

closed_txs.write().await.put(id, ());
closed_txs.write().await.put(id, closed_tx);
}
}
})
Expand Down
2 changes: 1 addition & 1 deletion query-engine/core/src/interactive_transactions/error.rs
Expand Up @@ -8,7 +8,7 @@ pub enum TransactionError {
#[error("Attempted to start a transaction inside of a transaction.")]
AlreadyStarted,

#[error("Transaction not found.")]
#[error("Transaction not found. Transaction ID is invalid, refers to an old closed transaction Prisma doesn't have information about anymore, or was obtained before disconnecting.")]
NotFound,

#[error("Transaction already closed: {reason}.")]
Expand Down

0 comments on commit fb37317

Please sign in to comment.