From bb597145397d6fd337e328da2aeff1dfc2bde890 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Thu, 25 Aug 2022 10:57:31 +0200 Subject: [PATCH] Await `ReadyForQuery` before emitting errors from transactional control methods. commitTransaction, rollbackTransaction and other methods now await completion of the exchange before emitting error signals to properly synchronize completion. Previously, error signals were emitted before updating the transaction state which could lead to invalid cleanup states if e.g. the commit failed. [resolves #541] Signed-off-by: Mark Paluch --- .../io/r2dbc/postgresql/ExceptionFactory.java | 2 +- .../postgresql/PostgresqlConnection.java | 30 +++++++++++++++---- ...resqlConnectionErrorsIntegrationTests.java | 25 +++++++++++++--- 3 files changed, 47 insertions(+), 10 deletions(-) diff --git a/src/main/java/io/r2dbc/postgresql/ExceptionFactory.java b/src/main/java/io/r2dbc/postgresql/ExceptionFactory.java index b1c7a745..3803f952 100644 --- a/src/main/java/io/r2dbc/postgresql/ExceptionFactory.java +++ b/src/main/java/io/r2dbc/postgresql/ExceptionFactory.java @@ -61,7 +61,7 @@ static ExceptionFactory withSql(String sql) { * @return the {@link R2dbcException}. * @see ErrorResponse */ - private static R2dbcException createException(ErrorResponse response, String sql) { + static R2dbcException createException(ErrorResponse response, String sql) { return createException(new ErrorDetails(response.getFields()), sql); } diff --git a/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java b/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java index 0ea8cbe9..04b0fb6f 100644 --- a/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java +++ b/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java @@ -30,12 +30,14 @@ import io.r2dbc.postgresql.codec.Codecs; import io.r2dbc.postgresql.message.backend.BackendMessage; import io.r2dbc.postgresql.message.backend.CommandComplete; +import io.r2dbc.postgresql.message.backend.ErrorResponse; import io.r2dbc.postgresql.message.backend.NotificationResponse; import io.r2dbc.postgresql.util.Assert; import io.r2dbc.postgresql.util.Operators; import io.r2dbc.spi.Connection; import io.r2dbc.spi.IsolationLevel; import io.r2dbc.spi.Option; +import io.r2dbc.spi.R2dbcException; import io.r2dbc.spi.TransactionDefinition; import io.r2dbc.spi.ValidationDepth; import org.reactivestreams.Publisher; @@ -189,6 +191,8 @@ public Mono cancelRequest() { @Override public Mono commitTransaction() { + + AtomicReference ref = new AtomicReference<>(); return useTransactionStatus(transactionStatus -> { if (IDLE != transactionStatus) { return Flux.from(exchange("COMMIT")) @@ -203,13 +207,17 @@ public Mono commitTransaction() { // See discussion in pgsql-hackers: https://www.postgresql.org/message-id/b9fb50dc-0f6e-15fb-6555-8ddb86f4aa71%40postgresfriends.org if ("ROLLBACK".equalsIgnoreCase(message.getCommand())) { - sink.error(new ExceptionFactory.PostgresqlRollbackException(ErrorDetails.fromMessage("The database returned ROLLBACK, so the transaction cannot be committed. Transaction" + - " " + - "failure is not known (check server logs?)"), "COMMIT")); + ErrorDetails details = ErrorDetails.fromMessage("The database returned ROLLBACK, so the transaction cannot be committed. Transaction " + + "failure is not known (check server logs?)"); + ref.set(new ExceptionFactory.PostgresqlRollbackException(details, "COMMIT")); return; } sink.next(message); + }).doOnComplete(() -> { + if (ref.get() != null) { + throw ref.get(); + } }); } else { this.logger.debug(this.connectionContext.getMessage("Skipping commit transaction because status is {}"), transactionStatus); @@ -452,9 +460,21 @@ private Mono withTransactionStatus(Function f) { @SuppressWarnings("unchecked") private Flux exchange(String sql) { - ExceptionFactory exceptionFactory = ExceptionFactory.withSql(sql); + AtomicReference ref = new AtomicReference<>(); return (Flux) SimpleQueryMessageFlow.exchange(this.client, sql) - .handle(exceptionFactory::handleErrorResponse); + .handle((backendMessage, synchronousSink) -> { + + if (backendMessage instanceof ErrorResponse) { + ref.set(ExceptionFactory.createException((ErrorResponse) backendMessage, sql)); + } else { + synchronousSink.next(backendMessage); + } + }) + .doOnComplete(() -> { + if (ref.get() != null) { + throw ref.get(); + } + }); } private void cleanupIsolationLevel() { diff --git a/src/test/java/io/r2dbc/postgresql/PostgresqlConnectionErrorsIntegrationTests.java b/src/test/java/io/r2dbc/postgresql/PostgresqlConnectionErrorsIntegrationTests.java index d6b27a2c..5477ea0c 100644 --- a/src/test/java/io/r2dbc/postgresql/PostgresqlConnectionErrorsIntegrationTests.java +++ b/src/test/java/io/r2dbc/postgresql/PostgresqlConnectionErrorsIntegrationTests.java @@ -18,12 +18,15 @@ import io.r2dbc.postgresql.api.PostgresqlConnection; import io.r2dbc.postgresql.api.PostgresqlResult; +import io.r2dbc.postgresql.client.Client; +import io.r2dbc.postgresql.client.TransactionStatus; import io.r2dbc.spi.R2dbcBadGrammarException; -import io.r2dbc.spi.R2dbcRollbackException; -import org.awaitility.Awaitility; +import io.r2dbc.spi.R2dbcException; import org.junit.jupiter.api.Test; import reactor.test.StepVerifier; +import java.lang.reflect.Field; + import static org.assertj.core.api.Assertions.assertThat; /** @@ -37,12 +40,26 @@ void commitShouldRecoverFromFailedTransaction() { this.connection.beginTransaction().as(StepVerifier::create).verifyComplete(); this.connection.createStatement("error").execute().flatMap(PostgresqlResult::getRowsUpdated).as(StepVerifier::create).verifyError(R2dbcBadGrammarException.class); - this.connection.commitTransaction().as(StepVerifier::create).verifyError(R2dbcRollbackException.class); + this.connection.commitTransaction().as(StepVerifier::create).verifyErrorSatisfies(throwable -> { + assertThat(throwable).isInstanceOf(R2dbcException.class); + + Client client = extractClient(); + assertThat(client.getTransactionStatus()).isEqualTo(TransactionStatus.IDLE); + }); - Awaitility.await().until(() -> this.connection.isAutoCommit()); assertThat(this.connection.isAutoCommit()).isTrue(); } + private Client extractClient() { + try { + Field field = io.r2dbc.postgresql.PostgresqlConnection.class.getDeclaredField("client"); + field.setAccessible(true); + return (Client) field.get(this.connection); + } catch (ReflectiveOperationException e) { + throw new RuntimeException(e); + } + } + @Test void rollbackShouldRecoverFromFailedTransaction() {