From 3bf6a4234ccd08c113c47f719743d2d1c8b802cf Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Thu, 25 Aug 2022 10:57:31 +0200 Subject: [PATCH] Await `ReadyForQuery` before emitting errors from transactional control methods. commitTransaction, rollbackTransaction and other methods now await completion of the exchange before emitting error signals to properly synchronize completion. Previously, error signals were emitted before updating the transaction state which could lead to invalid cleanup states if e.g. the commit failed. [resolves #541] Signed-off-by: Mark Paluch --- .../io/r2dbc/postgresql/ExceptionFactory.java | 2 +- .../postgresql/PostgresqlConnection.java | 30 +++++++++++++++---- ...resqlConnectionErrorsIntegrationTests.java | 25 +++++++++++++--- 3 files changed, 47 insertions(+), 10 deletions(-) diff --git a/src/main/java/io/r2dbc/postgresql/ExceptionFactory.java b/src/main/java/io/r2dbc/postgresql/ExceptionFactory.java index b1c7a745..3803f952 100644 --- a/src/main/java/io/r2dbc/postgresql/ExceptionFactory.java +++ b/src/main/java/io/r2dbc/postgresql/ExceptionFactory.java @@ -61,7 +61,7 @@ static ExceptionFactory withSql(String sql) { * @return the {@link R2dbcException}. * @see ErrorResponse */ - private static R2dbcException createException(ErrorResponse response, String sql) { + static R2dbcException createException(ErrorResponse response, String sql) { return createException(new ErrorDetails(response.getFields()), sql); } diff --git a/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java b/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java index 922a6cb6..da639783 100644 --- a/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java +++ b/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java @@ -29,12 +29,14 @@ import io.r2dbc.postgresql.codec.Codecs; import io.r2dbc.postgresql.message.backend.BackendMessage; import io.r2dbc.postgresql.message.backend.CommandComplete; +import io.r2dbc.postgresql.message.backend.ErrorResponse; import io.r2dbc.postgresql.message.backend.NotificationResponse; import io.r2dbc.postgresql.util.Assert; import io.r2dbc.postgresql.util.Operators; import io.r2dbc.spi.Connection; import io.r2dbc.spi.IsolationLevel; import io.r2dbc.spi.Option; +import io.r2dbc.spi.R2dbcException; import io.r2dbc.spi.TransactionDefinition; import io.r2dbc.spi.ValidationDepth; import org.reactivestreams.Publisher; @@ -184,6 +186,8 @@ public Mono cancelRequest() { @Override public Mono commitTransaction() { + + AtomicReference ref = new AtomicReference<>(); return useTransactionStatus(transactionStatus -> { if (IDLE != transactionStatus) { return Flux.from(exchange("COMMIT")) @@ -198,13 +202,17 @@ public Mono commitTransaction() { // See discussion in pgsql-hackers: https://www.postgresql.org/message-id/b9fb50dc-0f6e-15fb-6555-8ddb86f4aa71%40postgresfriends.org if ("ROLLBACK".equalsIgnoreCase(message.getCommand())) { - sink.error(new ExceptionFactory.PostgresqlRollbackException(ErrorDetails.fromMessage("The database returned ROLLBACK, so the transaction cannot be committed. Transaction" + - " " + - "failure is not known (check server logs?)"), "COMMIT")); + ErrorDetails details = ErrorDetails.fromMessage("The database returned ROLLBACK, so the transaction cannot be committed. Transaction " + + "failure is not known (check server logs?)"); + ref.set(new ExceptionFactory.PostgresqlRollbackException(details, "COMMIT")); return; } sink.next(message); + }).doOnComplete(() -> { + if (ref.get() != null) { + throw ref.get(); + } }); } else { this.logger.debug(this.connectionContext.getMessage("Skipping commit transaction because status is {}"), transactionStatus); @@ -442,9 +450,21 @@ private Mono withTransactionStatus(Function f) { @SuppressWarnings("unchecked") private Flux exchange(String sql) { - ExceptionFactory exceptionFactory = ExceptionFactory.withSql(sql); + AtomicReference ref = new AtomicReference<>(); return (Flux) SimpleQueryMessageFlow.exchange(this.client, sql) - .handle(exceptionFactory::handleErrorResponse); + .handle((backendMessage, synchronousSink) -> { + + if (backendMessage instanceof ErrorResponse) { + ref.set(ExceptionFactory.createException((ErrorResponse) backendMessage, sql)); + } else { + synchronousSink.next(backendMessage); + } + }) + .doOnComplete(() -> { + if (ref.get() != null) { + throw ref.get(); + } + }); } private void cleanupIsolationLevel() { diff --git a/src/test/java/io/r2dbc/postgresql/PostgresqlConnectionErrorsIntegrationTests.java b/src/test/java/io/r2dbc/postgresql/PostgresqlConnectionErrorsIntegrationTests.java index d6b27a2c..5477ea0c 100644 --- a/src/test/java/io/r2dbc/postgresql/PostgresqlConnectionErrorsIntegrationTests.java +++ b/src/test/java/io/r2dbc/postgresql/PostgresqlConnectionErrorsIntegrationTests.java @@ -18,12 +18,15 @@ import io.r2dbc.postgresql.api.PostgresqlConnection; import io.r2dbc.postgresql.api.PostgresqlResult; +import io.r2dbc.postgresql.client.Client; +import io.r2dbc.postgresql.client.TransactionStatus; import io.r2dbc.spi.R2dbcBadGrammarException; -import io.r2dbc.spi.R2dbcRollbackException; -import org.awaitility.Awaitility; +import io.r2dbc.spi.R2dbcException; import org.junit.jupiter.api.Test; import reactor.test.StepVerifier; +import java.lang.reflect.Field; + import static org.assertj.core.api.Assertions.assertThat; /** @@ -37,12 +40,26 @@ void commitShouldRecoverFromFailedTransaction() { this.connection.beginTransaction().as(StepVerifier::create).verifyComplete(); this.connection.createStatement("error").execute().flatMap(PostgresqlResult::getRowsUpdated).as(StepVerifier::create).verifyError(R2dbcBadGrammarException.class); - this.connection.commitTransaction().as(StepVerifier::create).verifyError(R2dbcRollbackException.class); + this.connection.commitTransaction().as(StepVerifier::create).verifyErrorSatisfies(throwable -> { + assertThat(throwable).isInstanceOf(R2dbcException.class); + + Client client = extractClient(); + assertThat(client.getTransactionStatus()).isEqualTo(TransactionStatus.IDLE); + }); - Awaitility.await().until(() -> this.connection.isAutoCommit()); assertThat(this.connection.isAutoCommit()).isTrue(); } + private Client extractClient() { + try { + Field field = io.r2dbc.postgresql.PostgresqlConnection.class.getDeclaredField("client"); + field.setAccessible(true); + return (Client) field.get(this.connection); + } catch (ReflectiveOperationException e) { + throw new RuntimeException(e); + } + } + @Test void rollbackShouldRecoverFromFailedTransaction() {