Skip to content

Commit

Permalink
Await ReadyForQuery before emitting errors from transactional contr…
Browse files Browse the repository at this point in the history
…ol methods.

commitTransaction, rollbackTransaction and other methods now await completion of the exchange before emitting error signals to properly synchronize completion.

Previously, error signals were emitted before updating the transaction state which could lead to invalid cleanup states if e.g. the commit failed.

[resolves #541]

Signed-off-by: Mark Paluch <mpaluch@vmware.com>
  • Loading branch information
mp911de committed Aug 25, 2022
1 parent 9f4d2a6 commit 7424903
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 7 deletions.
2 changes: 1 addition & 1 deletion src/main/java/io/r2dbc/postgresql/ExceptionFactory.java
Expand Up @@ -60,7 +60,7 @@ static ExceptionFactory withSql(String sql) {
* @return the {@link R2dbcException}.
* @see ErrorResponse
*/
private static R2dbcException createException(ErrorResponse response, String sql) {
static R2dbcException createException(ErrorResponse response, String sql) {

ErrorDetails errorDetails = new ErrorDetails(response.getFields());

Expand Down
47 changes: 44 additions & 3 deletions src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java
Expand Up @@ -16,6 +16,7 @@

package io.r2dbc.postgresql;

import io.r2dbc.postgresql.api.ErrorDetails;
import io.r2dbc.postgresql.api.Notification;
import io.r2dbc.postgresql.api.PostgresqlResult;
import io.r2dbc.postgresql.api.PostgresqlStatement;
Expand All @@ -25,11 +26,15 @@
import io.r2dbc.postgresql.client.SimpleQueryMessageFlow;
import io.r2dbc.postgresql.client.TransactionStatus;
import io.r2dbc.postgresql.codec.Codecs;
import io.r2dbc.postgresql.message.backend.BackendMessage;
import io.r2dbc.postgresql.message.backend.CommandComplete;
import io.r2dbc.postgresql.message.backend.ErrorResponse;
import io.r2dbc.postgresql.message.backend.NotificationResponse;
import io.r2dbc.postgresql.util.Assert;
import io.r2dbc.postgresql.util.Operators;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.IsolationLevel;
import io.r2dbc.spi.R2dbcException;
import io.r2dbc.spi.ValidationDepth;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
Expand Down Expand Up @@ -112,9 +117,33 @@ public Mono<Void> close() {

@Override
public Mono<Void> commitTransaction() {

AtomicReference<R2dbcException> ref = new AtomicReference<>();
return useTransactionStatus(transactionStatus -> {
if (IDLE != transactionStatus) {
return exchange("COMMIT");
return Flux.from(exchange("COMMIT"))
.filter(CommandComplete.class::isInstance)
.cast(CommandComplete.class)
.<BackendMessage>handle((message, sink) -> {

// Certain backend versions (e.g. 12.2, 11.7, 10.12, 9.6.17, 9.5.21, etc)
// silently rollback the transaction in the response to COMMIT statement
// in case the transaction has failed.
// See discussion in pgsql-hackers: https://www.postgresql.org/message-id/b9fb50dc-0f6e-15fb-6555-8ddb86f4aa71%40postgresfriends.org

if ("ROLLBACK".equalsIgnoreCase(message.getCommand())) {
ErrorDetails details = ErrorDetails.fromMessage("The database returned ROLLBACK, so the transaction cannot be committed. Transaction " +
"failure is not known (check server logs?)");
ref.set(new ExceptionFactory.PostgresqlRollbackException(details));
return;
}

sink.next(message);
}).doOnComplete(() -> {
if (ref.get() != null) {
throw ref.get();
}
});
} else {
this.logger.debug(this.connectionContext.getMessage("Skipping commit transaction because status is {}"), transactionStatus);
return Mono.empty();
Expand Down Expand Up @@ -358,9 +387,21 @@ private <T> Mono<T> withTransactionStatus(Function<TransactionStatus, T> f) {

@SuppressWarnings("unchecked")
private <T> Publisher<T> exchange(String sql) {
ExceptionFactory exceptionFactory = ExceptionFactory.withSql(sql);
AtomicReference<R2dbcException> ref = new AtomicReference<>();
return (Publisher<T>) SimpleQueryMessageFlow.exchange(this.client, sql)
.handle(exceptionFactory::handleErrorResponse);
.handle((backendMessage, synchronousSink) -> {

if (backendMessage instanceof ErrorResponse) {
ref.set(ExceptionFactory.createException((ErrorResponse) backendMessage, sql));
} else {
synchronousSink.next(backendMessage);
}
})
.doOnComplete(() -> {
if (ref.get() != null) {
throw ref.get();
}
});
}

/**
Expand Down
Expand Up @@ -18,11 +18,15 @@

import io.r2dbc.postgresql.api.PostgresqlConnection;
import io.r2dbc.postgresql.api.PostgresqlResult;
import io.r2dbc.postgresql.client.Client;
import io.r2dbc.postgresql.client.TransactionStatus;
import io.r2dbc.spi.R2dbcBadGrammarException;
import org.awaitility.Awaitility;
import io.r2dbc.spi.R2dbcException;
import org.junit.jupiter.api.Test;
import reactor.test.StepVerifier;

import java.lang.reflect.Field;

import static org.assertj.core.api.Assertions.assertThat;

/**
Expand All @@ -36,12 +40,26 @@ void commitShouldRecoverFromFailedTransaction() {
this.connection.beginTransaction().as(StepVerifier::create).verifyComplete();
this.connection.createStatement("error").execute().flatMap(PostgresqlResult::getRowsUpdated).as(StepVerifier::create).verifyError(R2dbcBadGrammarException.class);

this.connection.commitTransaction().as(StepVerifier::create).verifyComplete();
this.connection.commitTransaction().as(StepVerifier::create).verifyErrorSatisfies(throwable -> {
assertThat(throwable).isInstanceOf(R2dbcException.class);

Client client = extractClient();
assertThat(client.getTransactionStatus()).isEqualTo(TransactionStatus.IDLE);
});

Awaitility.await().until(() -> this.connection.isAutoCommit());
assertThat(this.connection.isAutoCommit()).isTrue();
}

private Client extractClient() {
try {
Field field = io.r2dbc.postgresql.PostgresqlConnection.class.getDeclaredField("client");
field.setAccessible(true);
return (Client) field.get(this.connection);
} catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
}
}

@Test
void rollbackShouldRecoverFromFailedTransaction() {

Expand Down

0 comments on commit 7424903

Please sign in to comment.