diff --git a/src/main/java/io/r2dbc/postgresql/ExceptionFactory.java b/src/main/java/io/r2dbc/postgresql/ExceptionFactory.java index 5cef43a8..ac7a7838 100644 --- a/src/main/java/io/r2dbc/postgresql/ExceptionFactory.java +++ b/src/main/java/io/r2dbc/postgresql/ExceptionFactory.java @@ -60,7 +60,7 @@ static ExceptionFactory withSql(String sql) { * @return the {@link R2dbcException}. * @see ErrorResponse */ - private static R2dbcException createException(ErrorResponse response, String sql) { + static R2dbcException createException(ErrorResponse response, String sql) { ErrorDetails errorDetails = new ErrorDetails(response.getFields()); diff --git a/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java b/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java index 30844a9c..b880896c 100644 --- a/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java +++ b/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java @@ -16,6 +16,7 @@ package io.r2dbc.postgresql; +import io.r2dbc.postgresql.api.ErrorDetails; import io.r2dbc.postgresql.api.Notification; import io.r2dbc.postgresql.api.PostgresqlResult; import io.r2dbc.postgresql.api.PostgresqlStatement; @@ -25,11 +26,15 @@ import io.r2dbc.postgresql.client.SimpleQueryMessageFlow; import io.r2dbc.postgresql.client.TransactionStatus; import io.r2dbc.postgresql.codec.Codecs; +import io.r2dbc.postgresql.message.backend.BackendMessage; +import io.r2dbc.postgresql.message.backend.CommandComplete; +import io.r2dbc.postgresql.message.backend.ErrorResponse; import io.r2dbc.postgresql.message.backend.NotificationResponse; import io.r2dbc.postgresql.util.Assert; import io.r2dbc.postgresql.util.Operators; import io.r2dbc.spi.Connection; import io.r2dbc.spi.IsolationLevel; +import io.r2dbc.spi.R2dbcException; import io.r2dbc.spi.ValidationDepth; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; @@ -112,9 +117,33 @@ public Mono close() { @Override public Mono commitTransaction() { + + AtomicReference ref = new AtomicReference<>(); return useTransactionStatus(transactionStatus -> { if (IDLE != transactionStatus) { - return exchange("COMMIT"); + return Flux.from(exchange("COMMIT")) + .filter(CommandComplete.class::isInstance) + .cast(CommandComplete.class) + .handle((message, sink) -> { + + // Certain backend versions (e.g. 12.2, 11.7, 10.12, 9.6.17, 9.5.21, etc) + // silently rollback the transaction in the response to COMMIT statement + // in case the transaction has failed. + // See discussion in pgsql-hackers: https://www.postgresql.org/message-id/b9fb50dc-0f6e-15fb-6555-8ddb86f4aa71%40postgresfriends.org + + if ("ROLLBACK".equalsIgnoreCase(message.getCommand())) { + ErrorDetails details = ErrorDetails.fromMessage("The database returned ROLLBACK, so the transaction cannot be committed. Transaction " + + "failure is not known (check server logs?)"); + ref.set(new ExceptionFactory.PostgresqlRollbackException(details)); + return; + } + + sink.next(message); + }).doOnComplete(() -> { + if (ref.get() != null) { + throw ref.get(); + } + }); } else { this.logger.debug(this.connectionContext.getMessage("Skipping commit transaction because status is {}"), transactionStatus); return Mono.empty(); @@ -358,9 +387,21 @@ private Mono withTransactionStatus(Function f) { @SuppressWarnings("unchecked") private Publisher exchange(String sql) { - ExceptionFactory exceptionFactory = ExceptionFactory.withSql(sql); + AtomicReference ref = new AtomicReference<>(); return (Publisher) SimpleQueryMessageFlow.exchange(this.client, sql) - .handle(exceptionFactory::handleErrorResponse); + .handle((backendMessage, synchronousSink) -> { + + if (backendMessage instanceof ErrorResponse) { + ref.set(ExceptionFactory.createException((ErrorResponse) backendMessage, sql)); + } else { + synchronousSink.next(backendMessage); + } + }) + .doOnComplete(() -> { + if (ref.get() != null) { + throw ref.get(); + } + }); } /** diff --git a/src/test/java/io/r2dbc/postgresql/PostgresqlConnectionErrorsIntegrationTests.java b/src/test/java/io/r2dbc/postgresql/PostgresqlConnectionErrorsIntegrationTests.java index 8beae9a7..5477ea0c 100644 --- a/src/test/java/io/r2dbc/postgresql/PostgresqlConnectionErrorsIntegrationTests.java +++ b/src/test/java/io/r2dbc/postgresql/PostgresqlConnectionErrorsIntegrationTests.java @@ -18,11 +18,15 @@ import io.r2dbc.postgresql.api.PostgresqlConnection; import io.r2dbc.postgresql.api.PostgresqlResult; +import io.r2dbc.postgresql.client.Client; +import io.r2dbc.postgresql.client.TransactionStatus; import io.r2dbc.spi.R2dbcBadGrammarException; -import org.awaitility.Awaitility; +import io.r2dbc.spi.R2dbcException; import org.junit.jupiter.api.Test; import reactor.test.StepVerifier; +import java.lang.reflect.Field; + import static org.assertj.core.api.Assertions.assertThat; /** @@ -36,12 +40,26 @@ void commitShouldRecoverFromFailedTransaction() { this.connection.beginTransaction().as(StepVerifier::create).verifyComplete(); this.connection.createStatement("error").execute().flatMap(PostgresqlResult::getRowsUpdated).as(StepVerifier::create).verifyError(R2dbcBadGrammarException.class); - this.connection.commitTransaction().as(StepVerifier::create).verifyComplete(); + this.connection.commitTransaction().as(StepVerifier::create).verifyErrorSatisfies(throwable -> { + assertThat(throwable).isInstanceOf(R2dbcException.class); + + Client client = extractClient(); + assertThat(client.getTransactionStatus()).isEqualTo(TransactionStatus.IDLE); + }); - Awaitility.await().until(() -> this.connection.isAutoCommit()); assertThat(this.connection.isAutoCommit()).isTrue(); } + private Client extractClient() { + try { + Field field = io.r2dbc.postgresql.PostgresqlConnection.class.getDeclaredField("client"); + field.setAccessible(true); + return (Client) field.get(this.connection); + } catch (ReflectiveOperationException e) { + throw new RuntimeException(e); + } + } + @Test void rollbackShouldRecoverFromFailedTransaction() {