Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inner suspended transaction doesn't roll back when outer transaction throws #1586

Open
pablo-silverback opened this issue Sep 3, 2022 · 16 comments

Comments

@pablo-silverback
Copy link

pablo-silverback commented Sep 3, 2022

In a context of suspended parent/child transactions, when there's an error after the child block ended the transaction doesn't roll back.

Pseudocode:

newSuspendedTransaction {
  insert(A)
  newSuspendedTransaction {
    insert(B)
  }
  throw Exception()
}

In this scenario, B is inserted on the database (transaction doesn't roll back).

Note that replacing newSuspendedTransaction with transaction (not suspended) works as expected.

Here's a complete working test that illustrates the issue:

object ExampleTable : LongIdTable("transaction_example_table") {
    private val value = text("value")

    fun add(v: String) = ExampleTable.insert { it[value] = v }

    fun getAll(): List<String> = selectAll().map { it[value] }
}

class TransactionTests {
    companion object {
        private lateinit var db: Database

        @JvmStatic
        @BeforeAll
        fun setup() {
            val ds = PGSimpleDataSource()
            ds.setUrl("jdbc:postgresql://localhost:5432/test_db?user=user&password=pass")
            db = Database.connect(ds as DataSource)
            transaction(db) {
                SchemaUtils.drop(ExampleTable)
                SchemaUtils.create(ExampleTable)
            }
        }
    }

    @BeforeEach
    fun deleteAll() {
        transaction(db) { ExampleTable.deleteAll() }
    }

    private fun assertNoRows() = Assertions.assertEquals(0, ExampleTable.selectAll().count())

    @Test
    fun `inner new suspended transactions don't rollback when outer throws`(): Unit = runBlocking {
        db.useNestedTransactions = true // this doesn't seem to change anything
        try {
            newSuspendedTransaction(Dispatchers.Default, db) {
                assertNoRows()
                ExampleTable.add("outer")

                newSuspendedTransaction(Dispatchers.Default, db) {
                    ExampleTable.add("inner")
                }
                throw Exception("outer transaction throws")
            }
        } catch (e: Exception) {
            transaction(db) {
                // this is the problem
                // assertNoRows() -> this would fail
                Assertions.assertEquals("inner", ExampleTable.getAll().single())
            }
        }
    }
}
@thamidu
Copy link

thamidu commented Sep 5, 2022

It's somewhere in the documentation.

The reason is when creating a suspendedTransaction, it starts a transaction as a separate new one. So it really doesn't do anything if you start a txn inside another txn. It always will be a separate one. So in this case, the child's txn will still be complete while the parent gets rollback.

This is a known issue in this framework. The only thing you can do is use threaded-based web frameworks like Servelets, and Spring MVC or wait till the Exposed RDBC implementation which is currently under development.

Edit: As @AlexeySoshin it's not an issue. Still, you can use JDBC with coroutines, but the downside is it blocks the thread till the SQL server returns the result. I did some digging and posted about that in the comments down below. You can have a look if you are interested. Sorry for the confusion.

@AlexeySoshin
Copy link
Contributor

AlexeySoshin commented Sep 5, 2022

@pablo-silverback
Thank you for providing a test case to reproduce.

Could you try the following code, please, and tell me if this is the behavior you were expecting?

newSuspendedTransaction(Dispatchers.Default, db) {
    assertNoRows()
    ExampleTable.add("outer")

    suspendedTransaction {
        ExampleTable.add("inner")
    }
    throw Exception("outer transaction throws")
}

@pablo-silverback
Copy link
Author

@AlexeySoshin thanks! that fixes the test case. The problem is that our code is not that simple and it looks more like this:

newSuspendedTransaction(Dispatchers.Default, db) {
    assertNoRows()
    ExampleTable.add("outer")
    insertInner()
    throw Exception("outer transaction throws")
}

private fun insertInner() {
  suspendedTransaction { // -> obviously doesn't compile
    ExampleTable.add("inner")
  }
}

That's not an issue with transaction since we can reference the db object. What's the "exposed way" of running nested suspended transactions that may span multiple methods?

@pablo-silverback
Copy link
Author

@AlexeySoshin Also, if it's a known issue like @thamidu points out, I'm happy to work on a fix for it (with some pointers of where to look). We rely heavily on exposed and would love to help improve it 🤗 (instead of work around it)

@AlexeySoshin
Copy link
Contributor

AlexeySoshin commented Sep 8, 2022

@pablo-silverback

Thank you for the proposal, and contributions would be more than welcome, but I disagree with @thamidu that it's an issue at all.

The reason the above code doesn't compile is that:

  1. The function is not suspended, and suspendedTransactions requires a suspended context
  2. suspendedTransaction is declared an extension function on Transaction

The following code compiles and works as expected:

    private suspend fun Transaction.insertInner() {
        suspendedTransaction { // -> compiles
            ExampleTable.add("inner")
        }
    }    

Hope that helps.

@lucas-avila-aside
Copy link

lucas-avila-aside commented Sep 8, 2022

@AlexeySoshin hi 👋
Yes, that's right. With an extension method would work but what do you think about the following scenario:

suspend fun A() {
   newSuspendedTransaction(Dispatchers.Default, db) {
      assertNoRows()
      ExampleTable.add("outer")
      insertInner()
  }
}

private suspend fun Transaction.insertInner() {
        suspendedTransaction {
            ExampleTable.add("inner")
        }
    } 

And then let's say you have somewhere, in another class/module/package/etc:

suspend fun B() {
   newSuspendedTransaction(Dispatchers.Default, db) {
      A()
      ... // more code
      throw Exception("outer outer transaction throws")
  }
}

In this case A() won't be rollbacked because it's creating a newSuspendedTransaction.

I think there are ways of structuring the code to try to work-around this but wouldn't it be nice for Exposed to notice that if the transaction is inside another one it has to treat it as the extension case?

@pablo-silverback
Copy link
Author

@AlexeySoshin We have scenarios where insertInner is used as a standalone method (without a parent transaction). This is why we have nested newSuspendedTransactions. Exposed works seamlessly with non-supspended transaction calls for these scenarios and, as I understand it, there is no suspended equivalent.

My question then would be, what's the recommended way of dealing with this scenario (suspended tx calls that may or may not be nested)?

@leoneparise
Copy link
Contributor

leoneparise commented Sep 9, 2022

I had this scenario and created an utility function which to solve it:

suspend fun <T> inTransaction(
    transaction: Transaction? = null,
    context: CoroutineDispatcher = Dispatchers.IO,
    db: Database? = null,
    transactionIsolation: Int? = null,
    statement: suspend (Transaction) -> T
): T = transaction
    ?.suspendedTransaction(
        context = context,
        statement = { statement(this) }
    )
    ?: newSuspendedTransaction(
        context = context,
        db = db,
        transactionIsolation = transactionIsolation,
        statement = { statement(this) }
    )

@pablo-silverback
Copy link
Author

pablo-silverback commented Sep 12, 2022

@leoneparise thanks! using that definition of inTransaction this test case passes (when it should not)

try {
    inTransaction(context = Dispatchers.Default, db = db) {
        assertNoRows()
        ExampleTable.add("outer")

        inTransaction(context = Dispatchers.Default, db = db) {
            ExampleTable.add("inner")
        }
        throw Exception("outer transaction throws")
    }
} catch (e: Exception) {
    transaction(db) {
        Assertions.assertEquals("inner", ExampleTable.getAll().single())
    }
}

I assume the second call it's expected to be inTransaction(transaction = it, context = Dispatchers.Default, db = db) but then we're at square one again: the caller of inTransaction needs to know if it's running inside another transaction or not.

Note this is not required with the ordinary transaction call.

@thamidu
Copy link

thamidu commented Sep 14, 2022

I did some dig up on how those things works. So I recreated all of the above codes and compared them with the SQL query log of my MariaDB Server. Those are my findings.

Specs:

MariaDB - 10.2.11
Ktor - 2.0.1
Exposed - 0.38.2
mysql-connector-java - 8.0.29
HikariCP - 5.0.1

Hikari Connection Pooling with default properties and the additional properties below

cachePrepStmts = true
prepStmtCacheSize = 250
prepStmtCacheSqlLimit = 2048

Case 01 (Initial question)

Code -

suspend fun testNested(): Nothing {
    newSuspendedTransaction {
        Students.selectAll().count()

        newSuspendedTransaction {
            Students.update({Students.id eq 1}) {  st -> st[Students.status] = 0 }
        }

        throw Exception()
    }
}

MariaDB Log -

Id Command	Argument

12 Query	SET autocommit=0
12 Query	SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
12 Query	SELECT COUNT(*) FROM students
13 Query	SET autocommit=0
13 Query	SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
13 Query	UPDATE students SET status=0 WHERE students.id = 1
13 Query	commit
13 Query	SET autocommit=1
12 Query	rollback
12 Query	SET autocommit=1

Conclusion - As I mentioned in my previous comment, when you call (create) newSuspendedTransaction { } inside a parent txn, it will start as a separate txn. That's why the parent txn started in thread 12 and the child txn started in thread 13. That's why the child txn gets complete while the parent gets rollback.

Case 2 (@AlexeySoshin Solution)

Code -

suspend fun testNested(): Nothing {
    newSuspendedTransaction {
        Students.selectAll().count()

        suspendedTransaction {
            Students.update({Students.id eq 1}) {  st -> st[Students.status] = 0 }
        }

        throw Exception()
    }
}

MariaDB Log -

Id Command	Argument

12 Query	SET autocommit=0
12 Query	SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
12 Query	SELECT COUNT(*) FROM students
12 Query	UPDATE students SET status=0 WHERE students.id = 1
12 Query	rollback
12 Query	SET autocommit=1

Conclusion - This is the way. When you declare a suspendedTransaction { } inside a newSuspendedTransaction { }, it will takes the parent txn (scope's transaction) as it's txn. So basically the child txn's code will be merged into the parent's txn, which results a single txn. That's why there is not any other separate thread for the child txn. It's all in thread 12. So if there's any error at some point, the whole code will roll back.

Case 3 (@leoneparise solution)

This also output the same as Case 2, but with an exception. You need to pass the parent's txn to the child.
as mentioned in

I assume the second call it's expected to be inTransaction(transaction = it, context = Dispatchers.Default, db = db) but then we're at square one again: the caller of inTransaction needs to know if it's running inside another transaction or not.

My implementation

I also have this same scenario just like yours. In my project, I also have some functions which calls directly and inside another function. So here is my implementation which also fixes the issue you raised and does the same function as Case 2 and 3

suspend fun <T> susTxn(
    db: Database? = null,
    statement: suspend (Transaction) -> T
): T = withContext(Dispatchers.IO){
    TransactionManager.currentOrNull()
        ?.let { it.suspendedTransaction { statement(this) } }
        ?: newSuspendedTransaction(db = db) { statement(this) }
}

In this case, you don't have to pass parent's transaction or whats or ever. You can use your functions as a standalone or inside another transaction. Please note that this implementation has not been tested. So please, use it with caution. I am not sure about how it will behave under many requests concurrently. But currently, we use this method in our development builds (not released to production yet) to fetch/persist data. No issue so far.

Test Case (For the solution above with parrarel requests)

Code -

// KTor Route
get("/test") {
    testNested()
    call.respond(HttpStatusCode.OK)
}

suspend fun testNested() = susTxn {
    Students.selectAll().count()

    innerTxn()

    throw Exception("Exception occurred")
    null
}

// Here, the innerTxn can be called separately too.
suspend fun innerTxn() = susTxn {
    Students.update({Students.id eq 1}) {  st -> st[Students.status] = 0 }
}
# Will send 10 concurrent requests

xargs -I % -P 10 curl -X GET -v http://localhost:8080/test < <(printf '%s\n' {1..10})

MariaDB Log -

Id Command	Argument
12 Query	SET autocommit=0
19 Query	SET autocommit=0
13 Query	SET autocommit=0
13 Query	SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
19 Query	SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
19 Query	SELECT COUNT(*) FROM students
12 Query	SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
13 Query	SELECT COUNT(*) FROM students
13 Query	UPDATE students SET status=0 WHERE students.id = 1
19 Query	UPDATE students SET status=0 WHERE students.id = 1
15 Query	SET autocommit=0
15 Query	SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
12 Query	SELECT COUNT(*) FROM students
15 Query	SELECT COUNT(*) FROM students
14 Query	SET autocommit=0
12 Query	UPDATE students SET status=0 WHERE students.id = 1
14 Query	SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
14 Query	SELECT COUNT(*) FROM students
13 Query	rollback
18 Query	SET autocommit=0
17 Query	SET autocommit=0
18 Query	SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
17 Query	SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
17 Query	SELECT COUNT(*) FROM students
15 Query	UPDATE students SET status=0 WHERE students.id = 1
19 Query	rollback
17 Query	UPDATE students SET status=0 WHERE students.id = 1
18 Query	SELECT COUNT(*) FROM students
14 Query	UPDATE students SET status=0 WHERE students.id = 1
18 Query	UPDATE students SET status=0 WHERE students.id = 1
12 Query	rollback
19 Query	SET autocommit=1
12 Query	SET autocommit=1
13 Query	SET autocommit=1
15 Query	rollback
14 Query	rollback
17 Query	rollback
18 Query	rollback
14 Query	SET autocommit=1
15 Query	SET autocommit=1
17 Query	SET autocommit=1
18 Query	SET autocommit=1
12 Query	SET autocommit=0
12 Query	SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
12 Query	SELECT COUNT(*) FROM students
16 Query	SET autocommit=0
16 Query	SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
16 Query	SELECT COUNT(*) FROM students
15 Query	SET autocommit=0
16 Query	UPDATE students SET status=0 WHERE students.id = 1
15 Query	SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
12 Query	UPDATE students SET status=0 WHERE students.id = 1
16 Query	rollback
15 Query	SELECT COUNT(*) FROM students
12 Query	rollback
16 Query	SET autocommit=1
15 Query	UPDATE students SET status=0 WHERE students.id = 1
15 Query	rollback
15 Query	SET autocommit=1
12 Query	SET autocommit=1

MariaDB Log (sorted by thread ID) -

12 Query    SET autocommit=0
12 Query    SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
12 Query    SELECT COUNT(*) FROM students
12 Query    UPDATE students SET status=0 WHERE students.id = 1
12 Query    rollback
12 Query    SET autocommit=1
12 Query    SET autocommit=0
12 Query    SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
12 Query    SELECT COUNT(*) FROM students
12 Query    UPDATE students SET status=0 WHERE students.id = 1
12 Query    rollback
12 Query    SET autocommit=1
13 Query    SET autocommit=0
13 Query    SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
13 Query    SELECT COUNT(*) FROM students
13 Query    UPDATE students SET status=0 WHERE students.id = 1
13 Query    rollback
13 Query    SET autocommit=1
14 Query    SET autocommit=0
14 Query    SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
14 Query    SELECT COUNT(*) FROM students
14 Query    UPDATE students SET status=0 WHERE students.id = 1
14 Query    rollback
14 Query    SET autocommit=1
15 Query    SET autocommit=0
15 Query    SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
15 Query    SELECT COUNT(*) FROM students
15 Query    UPDATE students SET status=0 WHERE students.id = 1
15 Query    rollback
15 Query    SET autocommit=1
15 Query    SET autocommit=0
15 Query    SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
15 Query    SELECT COUNT(*) FROM students
15 Query    UPDATE students SET status=0 WHERE students.id = 1
15 Query    rollback
15 Query    SET autocommit=1
16 Query    SET autocommit=0
16 Query    SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
16 Query    SELECT COUNT(*) FROM students
16 Query    UPDATE students SET status=0 WHERE students.id = 1
16 Query    rollback
16 Query    SET autocommit=1
17 Query    SET autocommit=0
17 Query    SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
17 Query    SELECT COUNT(*) FROM students
17 Query    UPDATE students SET status=0 WHERE students.id = 1
17 Query    rollback
17 Query    SET autocommit=1
18 Query    SET autocommit=0
18 Query    SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
18 Query    SELECT COUNT(*) FROM students
18 Query    UPDATE students SET status=0 WHERE students.id = 1
18 Query    rollback
18 Query    SET autocommit=1
19 Query    SET autocommit=0
19 Query    SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
19 Query    SELECT COUNT(*) FROM students
19 Query    UPDATE students SET status=0 WHERE students.id = 1
19 Query    rollback
19 Query    SET autocommit=1    

Conclusion - As you can see, all of those 10 requests got processed and got rolledbacked. But I'm little bit skeptical about some threads (aka DB Connections) executed multiple transactions. Im not sure, whether Hikari reused those connections once a transaction is processed / or queries got mixed up. But my bet is Hikari reused those connections.

Hope this helps you to get a basic understanding and solves your issue. Please let me know any corrections to be made.

@AlexeySoshin
Copy link
Contributor

That's a great analysis, @thamidu, thanks for that!

@pablo-silverback
Copy link
Author

@thamidu thanks for the deep analysis, unfortunately that approach relies on ThreadLocal, which is not AFAIK safe between kotlin coroutines.

For something like this to be safe I believe exposed should take into consideration something like the context pattern (also used in Go's goroutines).

//cc @AlexeySoshin

@ov7a
Copy link

ov7a commented Oct 7, 2022

Hi! Could anyone clarify, why nested suspending transactions code (as recommended by @AlexeySoshin)

  newSuspendedTransaction {
      exec("SELECT 1")
      suspendedTransaction {
          exec("SELECT 2")
      }
  }

has different behavior in comparison with regular transactions?

  transaction {
      exec("SELECT 1")
      transaction {
          exec("SELECT 2")
      }
  }

With suspended transactions only one transaction is actually created and reused for both statements. But with regular transactions two transactions are created, one having another as outerTransaction.

@AlexeySoshin
Copy link
Contributor

@pablo-silverback Kotlin coroutines support context.

You can see some examples there:
#1565

@pabloogc
Copy link

pabloogc commented Sep 1, 2023

I wanted to have a similar behavior using regular transaction blocks and suspending ones, so code like this

suspendedTransaction {
    suspendedTransaction {
       insert(1)
    }
    suspendedTransaction {
       insert(2)
    }
    throw Error("Both should roll back")
}

rolls back both insert(1) and insert(2). Using simply newSuspendedTransaction will not roll back either as of version 0.42.1 but will work properly with regular transaction { ... }.

My actual use code are composition of use cases so if a later one fails it also rolls back the changes of previous ones.

This is the utility function I am using using the public APIs, it seems to work well with the clear limitation that there is still only one transaction at the root, so changing the DB or the isolation level is not possible currently.

import kotlinx.coroutines.Dispatchers
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.Transaction
import org.jetbrains.exposed.sql.exposedLogger
import org.jetbrains.exposed.sql.transactions.TransactionManager
import org.jetbrains.exposed.sql.transactions.experimental.suspendedTransactionAsync
import org.jetbrains.exposed.sql.transactions.experimental.withSuspendTransaction
import kotlin.coroutines.CoroutineContext

suspend fun <T> suspendedTransaction(
    context: CoroutineContext? = null,
    db: Database? = null,
    transactionIsolation: Int? = null,
    statement: suspend Transaction.() -> T
) {
    val existing = TransactionManager.currentOrNull()
    if (existing == null) {
        // Create a new transaction, it will propagate the actual tx reference
        // using coroutine ThreadContextElements into the TransactionManager if this method is nested
        suspendedTransactionAsync(
            context = context,
            db = db,
            transactionIsolation = transactionIsolation,
            statement = statement
        ).await()
    } else {
        if (transactionIsolation != null || db != null) {
            exposedLogger.warn(
                "transactionIsolation=${transactionIsolation}, db=${db} " +
                        "was provided but will " +
                        "be ignored since this transaction was propagated."
            )
        }
        existing.withSuspendTransaction(context = context, statement = statement)
    }
}

@mariusingjer
Copy link

@pablo-silverback regarding this statement:

@thamidu thanks for the deep analysis, unfortunately that approach relies on ThreadLocal, which is not AFAIK safe between kotlin coroutines.

I am not sure you are entirely correct, "TransactionManager.currentOrNull()" relies on thread local, but Expose synchronizes the ThreadLocal state (take a look at

). So I think the last solution posted by @thamidu will work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants