Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

qe: improve error messages for closed transactions #3391

Merged
merged 14 commits into from
Nov 23, 2022
1 change: 0 additions & 1 deletion .envrc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ export PRISMA_BINARY_PATH=$(pwd)/target/release/query-engine
### QE config vars, set to testing values ###
export SQLITE_MAX_VARIABLE_NUMBER=250000 # This must be in sync with the setting in the engineer build CLI
export QUERY_BATCH_SIZE=10 # Max size of IN statement vars.
export CLOSED_TX_CLEANUP=2 # Time in seconds when a closed interactive transaction will be removed from the cache.

### QE test setup vars ###
export LOG_LEVEL=trace
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use query_engine_tests::test_suite;
use std::borrow::Cow;

/// Note that if cache expiration tests fail, make sure `CLOSED_TX_CLEANUP` is set correctly (low value like 2) from the .envrc.
#[test_suite(schema(generic))]
mod interactive_tx {
use query_engine_tests::*;
Expand Down Expand Up @@ -91,7 +90,7 @@ mod interactive_tx {
assert_eq!(known_err.error_code, Cow::Borrowed("P2028"));
assert!(known_err
.message
.contains("A commit cannot be executed on a closed transaction."));
.contains("A commit cannot be executed on an expired transaction"));

// Try again
let res = runner.commit_tx(tx_id).await?;
Expand All @@ -101,7 +100,7 @@ mod interactive_tx {
assert_eq!(known_err.error_code, Cow::Borrowed("P2028"));
assert!(known_err
.message
.contains("A commit cannot be executed on a closed transaction."));
.contains("A commit cannot be executed on an expired transaction"));

Ok(())
}
Expand Down Expand Up @@ -287,7 +286,7 @@ mod interactive_tx {
assert_eq!(known_err.error_code, Cow::Borrowed("P2028"));
assert!(known_err
.message
.contains("A commit cannot be executed on a closed transaction."));
.contains("A commit cannot be executed on an expired transaction"));

// Expect the state of the tx to be expired so the rollback should fail.
let res = runner.rollback_tx(tx_id.clone()).await?;
Expand All @@ -297,14 +296,14 @@ mod interactive_tx {
assert_eq!(known_err.error_code, Cow::Borrowed("P2028"));
assert!(known_err
.message
.contains("A rollback cannot be executed on a closed transaction."));
.contains("A rollback cannot be executed on an expired transaction"));

// Expect the state of the tx to be expired so the query should fail.
assert_error!(
runner,
r#"{ findManyTestModel { id } }"#,
2028,
"A query cannot be executed on a closed transaction."
"A query cannot be executed on an expired transaction"
);

runner
Expand All @@ -319,7 +318,7 @@ mod interactive_tx {
.await?
.assert_failure(
2028,
Some("A batch query cannot be executed on a closed transaction.".to_string()),
Some("A batch query cannot be executed on an expired transaction".to_string()),
);

Ok(())
Expand Down Expand Up @@ -419,6 +418,154 @@ mod interactive_tx {

Ok(())
}

#[connector_test]
async fn double_commit(mut runner: Runner) -> TestResult<()> {
let tx_id = runner.start_tx(5000, 5000, None).await?;
runner.set_active_tx(tx_id.clone());

insta::assert_snapshot!(
run_query!(&runner, r#"mutation { createOneTestModel(data: { id: 1 }) { id }}"#),
@r###"{"data":{"createOneTestModel":{"id":1}}}"###
);

// First commit must be successful
let res = runner.commit_tx(tx_id.clone()).await?;
assert!(res.is_ok());

// Second commit
let res = runner.commit_tx(tx_id).await?;
assert!(res.is_err());

runner.clear_active_tx();

let error = res.err().unwrap();
let known_err = error.as_known().unwrap();

assert_eq!(known_err.error_code, Cow::Borrowed("P2028"));
assert!(known_err
.message
.contains("A commit cannot be executed on a committed transaction"));

// The first commit must have worked
insta::assert_snapshot!(
run_query!(&runner, r#"query { findManyTestModel { id field }}"#),
@r###"{"data":{"findManyTestModel":[{"id":1,"field":null}]}}"###
);

Ok(())
}

#[connector_test]
async fn double_rollback(mut runner: Runner) -> TestResult<()> {
let tx_id = runner.start_tx(5000, 5000, None).await?;
runner.set_active_tx(tx_id.clone());

insta::assert_snapshot!(
run_query!(&runner, r#"mutation { createOneTestModel(data: { id: 1 }) { id }}"#),
@r###"{"data":{"createOneTestModel":{"id":1}}}"###
);

// First rollback must be successful
let res = runner.rollback_tx(tx_id.clone()).await?;
assert!(res.is_ok());

// Second rollback must return error
let res = runner.rollback_tx(tx_id).await?;
assert!(res.is_err());

runner.clear_active_tx();

let error = res.err().unwrap();
let known_err = error.as_known().unwrap();

assert_eq!(known_err.error_code, Cow::Borrowed("P2028"));
assert!(known_err
.message
.contains("A rollback cannot be executed on a transaction that was rolled back"));

// Check that the rollback still worked
insta::assert_snapshot!(
run_query!(&runner, r#"query { findManyTestModel { id field }}"#),
@r###"{"data":{"findManyTestModel":[]}}"###
);

Ok(())
}

#[connector_test]
async fn commit_after_rollback(mut runner: Runner) -> TestResult<()> {
let tx_id = runner.start_tx(5000, 5000, None).await?;
runner.set_active_tx(tx_id.clone());

insta::assert_snapshot!(
run_query!(&runner, r#"mutation { createOneTestModel(data: { id: 1 }) { id }}"#),
@r###"{"data":{"createOneTestModel":{"id":1}}}"###
);

// Rollback must be successful
let res = runner.rollback_tx(tx_id.clone()).await?;
assert!(res.is_ok());

// Commit must fail
let res = runner.commit_tx(tx_id).await?;
assert!(res.is_err());

runner.clear_active_tx();

let error = res.err().unwrap();
let known_err = error.as_known().unwrap();

assert_eq!(known_err.error_code, Cow::Borrowed("P2028"));
assert!(known_err
.message
.contains("A commit cannot be executed on a transaction that was rolled back"));

// Check that the commit didn't work
insta::assert_snapshot!(
run_query!(&runner, r#"query { findManyTestModel { id field }}"#),
@r###"{"data":{"findManyTestModel":[]}}"###
);

Ok(())
}

#[connector_test]
async fn rollback_after_commit(mut runner: Runner) -> TestResult<()> {
let tx_id = runner.start_tx(5000, 5000, None).await?;
runner.set_active_tx(tx_id.clone());

insta::assert_snapshot!(
run_query!(&runner, r#"mutation { createOneTestModel(data: { id: 1 }) { id }}"#),
@r###"{"data":{"createOneTestModel":{"id":1}}}"###
);

// Commit must be successful
let res = runner.commit_tx(tx_id.clone()).await?;
assert!(res.is_ok());

// Rollback must fail
let res = runner.rollback_tx(tx_id).await?;
assert!(res.is_err());

runner.clear_active_tx();

let error = res.err().unwrap();
let known_err = error.as_known().unwrap();

assert_eq!(known_err.error_code, Cow::Borrowed("P2028"));
assert!(known_err
.message
.contains("A rollback cannot be executed on a committed transaction"));

// Check that the commit worked
insta::assert_snapshot!(
run_query!(&runner, r#"query { findManyTestModel { id field }}"#),
@r###"{"data":{"findManyTestModel":[{"id":1,"field":null}]}}"###
);

Ok(())
}
}

#[test_suite(schema(generic))]
Expand Down
37 changes: 29 additions & 8 deletions query-engine/core/src/interactive_transactions/actor_manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{Operation, ResponseData};
use crate::{ClosedTx, Operation, ResponseData};
use lru::LruCache;
use once_cell::sync::Lazy;
use schema::QuerySchemaRef;
Expand Down Expand Up @@ -26,9 +26,9 @@ pub struct TransactionActorManager {
pub clients: Arc<RwLock<HashMap<TxId, ITXClient>>>,
/// Cache of closed transactions. We keep the last N closed transactions in memory to
/// return better error messages if operations are performed on closed transactions.
pub closed_txs: Arc<RwLock<LruCache<TxId, ()>>>,
pub closed_txs: Arc<RwLock<LruCache<TxId, Option<ClosedTx>>>>,
/// Channel used to signal an ITx is closed and can be moved to the list of closed transactions.
send_done: Sender<TxId>,
send_done: Sender<(TxId, Option<ClosedTx>)>,
/// Handle to the task in charge of clearing actors.
/// Used to abort the task when the TransactionActorManager is dropped.
bg_reader_clear: JoinHandle<()>,
Expand All @@ -49,10 +49,10 @@ impl Default for TransactionActorManager {

impl TransactionActorManager {
pub fn new() -> Self {
let clients: Arc<RwLock<HashMap<TxId, ITXClient>>> = Arc::new(RwLock::new(HashMap::new()));
let closed_txs: Arc<RwLock<LruCache<TxId, ()>>> = Arc::new(RwLock::new(LruCache::new(*CLOSED_TX_CACHE_SIZE)));
let clients = Arc::new(RwLock::new(HashMap::new()));
let closed_txs = Arc::new(RwLock::new(LruCache::new(*CLOSED_TX_CACHE_SIZE)));

let (send_done, rx) = channel::<TxId>(CHANNEL_SIZE);
let (send_done, rx) = channel(CHANNEL_SIZE);
let handle = spawn_client_list_clear_actor(clients.clone(), closed_txs.clone(), rx);

Self {
Expand All @@ -79,9 +79,30 @@ impl TransactionActorManager {
async fn get_client(&self, tx_id: &TxId, from_operation: &str) -> crate::Result<ITXClient> {
if let Some(client) = self.clients.read().await.get(tx_id) {
Ok(client.clone())
} else if self.closed_txs.read().await.contains(tx_id) {
} else if let Some(closed_tx) = self.closed_txs.read().await.peek(tx_id) {
Err(TransactionError::Closed {
reason: format!("A {from_operation} cannot be executed on a closed transaction."),
reason: match closed_tx {
Some(ClosedTx::Committed) => {
format!("A {from_operation} cannot be executed on a committed transaction")
}
Some(ClosedTx::RolledBack) => {
format!("A {from_operation} cannot be executed on a transaction that was rolled back")
}
Some(ClosedTx::Expired { start_time, timeout }) => {
format!(
"A {from_operation} cannot be executed on an expired transaction. \
The timeout for this transaction was {} ms, however {} ms passed since the start \
of the transaction. Consider increasing the interactive transaction timeout \
or doing less work in the transaction",
timeout.as_millis(),
start_time.elapsed().as_millis(),
)
}
None => {
error!("[{tx_id}] no details about closed transaction");
format!("A {from_operation} cannot be executed on a closed transaction")
}
},
}
.into())
} else {
Expand Down
24 changes: 15 additions & 9 deletions query-engine/core/src/interactive_transactions/actors.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{CachedTx, TransactionError, TxOpRequest, TxOpRequestMsg, TxOpResponse};
use crate::{
execute_many_operations, execute_single_operation, set_span_link_from_trace_id, OpenTx, Operation, ResponseData,
TxId,
execute_many_operations, execute_single_operation, set_span_link_from_trace_id, ClosedTx, OpenTx, Operation,
ResponseData, TxId,
};
use schema::QuerySchemaRef;
use std::{collections::HashMap, sync::Arc};
Expand All @@ -11,7 +11,7 @@ use tokio::{
oneshot, RwLock,
},
task::JoinHandle,
time::{self, Duration},
time::{self, Duration, Instant},
};
use tracing::Span;
use tracing_futures::Instrument;
Expand Down Expand Up @@ -249,7 +249,7 @@ pub fn spawn_itx_actor(
value: OpenTx,
timeout: Duration,
channel_size: usize,
send_done: Sender<TxId>,
send_done: Sender<(TxId, Option<ClosedTx>)>,
) -> ITXClient {
let (tx_to_server, rx_from_client) = channel::<TxOpRequest>(channel_size);

Expand All @@ -273,6 +273,7 @@ pub fn spawn_itx_actor(

tokio::task::spawn(
crate::executor::with_request_now(async move {
let start_time = Instant::now();
let sleep = time::sleep(timeout);
tokio::pin!(sleep);

Expand All @@ -297,7 +298,12 @@ pub fn spawn_itx_actor(

trace!("[{}] completed with {}", server.id.to_string(), server.cached_tx);

let _ = send_done.send(server.id.clone()).await;
let _ = send_done
.send((
server.id.clone(),
server.cached_tx.to_closed(start_time, server.timeout),
))
.await;

trace!("[{}] has stopped with {}", server.id.to_string(), server.cached_tx);
})
Expand Down Expand Up @@ -351,19 +357,19 @@ pub fn spawn_itx_actor(
*/
pub fn spawn_client_list_clear_actor(
clients: Arc<RwLock<HashMap<TxId, ITXClient>>>,
closed_txs: Arc<RwLock<lru::LruCache<TxId, ()>>>,
mut rx: Receiver<TxId>,
closed_txs: Arc<RwLock<lru::LruCache<TxId, Option<ClosedTx>>>>,
mut rx: Receiver<(TxId, Option<ClosedTx>)>,
) -> JoinHandle<()> {
tokio::task::spawn(async move {
loop {
if let Some(id) = rx.recv().await {
if let Some((id, closed_tx)) = rx.recv().await {
trace!("removing {} from client list", id);

let mut clients_guard = clients.write().await;
clients_guard.remove(&id);
drop(clients_guard);

closed_txs.write().await.put(id, ());
closed_txs.write().await.put(id, closed_tx);
}
}
})
Expand Down
2 changes: 1 addition & 1 deletion query-engine/core/src/interactive_transactions/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub enum TransactionError {
#[error("Attempted to start a transaction inside of a transaction.")]
AlreadyStarted,

#[error("Transaction not found.")]
#[error("Transaction not found. Transaction ID is invalid, refers to an old closed transaction Prisma doesn't have information about anymore, or was obtained before disconnecting.")]
NotFound,

#[error("Transaction already closed: {reason}.")]
Expand Down