Skip to content

Commit c5a5b3b

Browse files
committed
Throw PostgresqlRollbackException on silent rollback
We now throw PostgresqlRollbackException if Postgres silently rolls back a transaction when calling commitTransaction(). https://www.postgresql.org/message-id/b9fb50dc-0f6e-15fb-6555-8ddb86f4aa71%40postgresfriends.org [resolves pgjdbc#274]
1 parent fe2bb0e commit c5a5b3b

File tree

4 files changed

+102
-5
lines changed

4 files changed

+102
-5
lines changed

pom.xml

+7
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
<properties>
3535
<assertj.version>3.16.1</assertj.version>
36+
<awaitility.version>4.0.3</awaitility.version>
3637
<blockhound.version>1.0.3.RELEASE</blockhound.version>
3738
<hikari-cp.version>3.4.5</hikari-cp.version>
3839
<java.version>1.8</java.version>
@@ -186,6 +187,12 @@
186187
<version>${assertj.version}</version>
187188
<scope>test</scope>
188189
</dependency>
190+
<dependency>
191+
<groupId>org.awaitility</groupId>
192+
<artifactId>awaitility</artifactId>
193+
<version>${awaitility.version}</version>
194+
<scope>test</scope>
195+
</dependency>
189196
<dependency>
190197
<groupId>org.junit.jupiter</groupId>
191198
<artifactId>junit-jupiter-api</artifactId>

src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java

+26-4
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.r2dbc.postgresql;
1818

19+
import io.r2dbc.postgresql.api.ErrorDetails;
1920
import io.r2dbc.postgresql.api.Notification;
2021
import io.r2dbc.postgresql.api.PostgresqlResult;
2122
import io.r2dbc.postgresql.api.PostgresqlStatement;
@@ -24,6 +25,8 @@
2425
import io.r2dbc.postgresql.client.SimpleQueryMessageFlow;
2526
import io.r2dbc.postgresql.client.TransactionStatus;
2627
import io.r2dbc.postgresql.codec.Codecs;
28+
import io.r2dbc.postgresql.message.backend.BackendMessage;
29+
import io.r2dbc.postgresql.message.backend.CommandComplete;
2730
import io.r2dbc.postgresql.message.backend.NotificationResponse;
2831
import io.r2dbc.postgresql.util.Assert;
2932
import io.r2dbc.postgresql.util.Operators;
@@ -112,8 +115,26 @@ public Mono<Void> cancelRequest() {
112115
@Override
113116
public Mono<Void> commitTransaction() {
114117
return useTransactionStatus(transactionStatus -> {
115-
if (OPEN == transactionStatus) {
116-
return exchange("COMMIT");
118+
if (IDLE != transactionStatus) {
119+
return Flux.from(exchange("COMMIT"))
120+
.filter(CommandComplete.class::isInstance)
121+
.cast(CommandComplete.class)
122+
.<BackendMessage>handle((message, sink) -> {
123+
124+
// Certain backend versions (e.g. 12.2, 11.7, 10.12, 9.6.17, 9.5.21, etc)
125+
// silently rollback the transaction in the response to COMMIT statement
126+
// in case the transaction has failed.
127+
// See discussion in pgsql-hackers: https://www.postgresql.org/message-id/b9fb50dc-0f6e-15fb-6555-8ddb86f4aa71%40postgresfriends.org
128+
129+
if ("ROLLBACK".equalsIgnoreCase(message.getCommand())) {
130+
sink.error(new ExceptionFactory.PostgresqlRollbackException(ErrorDetails.fromMessage("The database returned ROLLBACK, so the transaction cannot be committed. Transaction" +
131+
" " +
132+
"failure is not known (check server logs?)")));
133+
return;
134+
}
135+
136+
sink.next(message);
137+
});
117138
} else {
118139
this.logger.debug("Skipping commit transaction because status is {}", transactionStatus);
119140
return Mono.empty();
@@ -341,9 +362,10 @@ private <T> Mono<T> withTransactionStatus(Function<TransactionStatus, T> f) {
341362
return Mono.defer(() -> Mono.just(f.apply(this.client.getTransactionStatus())));
342363
}
343364

344-
private Publisher<?> exchange(String sql) {
365+
@SuppressWarnings("unchecked")
366+
private <T> Publisher<T> exchange(String sql) {
345367
ExceptionFactory exceptionFactory = ExceptionFactory.withSql(sql);
346-
return SimpleQueryMessageFlow.exchange(this.client, sql)
368+
return (Publisher<T>) SimpleQueryMessageFlow.exchange(this.client, sql)
347369
.handle(exceptionFactory::handleErrorResponse)
348370
.as(Operators::discardOnCancel);
349371
}

src/main/java/io/r2dbc/postgresql/api/ErrorDetails.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.r2dbc.postgresql.message.backend.Field.FieldType;
2222
import io.r2dbc.postgresql.util.Assert;
2323

24+
import java.util.Collections;
2425
import java.util.HashMap;
2526
import java.util.List;
2627
import java.util.Map;
@@ -89,7 +90,7 @@ public final class ErrorDetails {
8990

9091

9192
/**
92-
* Creates a new exception.
93+
* Create new {@link ErrorDetails} from {@link List} of {@link Field fields}.
9394
*
9495
* @param fields the fields to be used to populate the exception
9596
* @throws IllegalArgumentException if {@code fields} is {@code null}
@@ -120,6 +121,16 @@ private ErrorDetails(Map<FieldType, String> fields) {
120121
this.where = fields.get(WHERE);
121122
}
122123

124+
/**
125+
* Create a new {@link ErrorDetails}
126+
*
127+
* @param message the error message
128+
* @return the {@link ErrorDetails} object
129+
*/
130+
public static ErrorDetails fromMessage(String message) {
131+
return new ErrorDetails(Collections.singletonMap(MESSAGE, message));
132+
}
133+
123134
@Override
124135
public boolean equals(Object o) {
125136
if (this == o) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.r2dbc.postgresql;
18+
19+
import io.r2dbc.postgresql.api.PostgresqlConnection;
20+
import io.r2dbc.postgresql.api.PostgresqlResult;
21+
import io.r2dbc.spi.R2dbcBadGrammarException;
22+
import io.r2dbc.spi.R2dbcRollbackException;
23+
import org.awaitility.Awaitility;
24+
import org.junit.jupiter.api.Test;
25+
import reactor.test.StepVerifier;
26+
27+
import static org.assertj.core.api.Assertions.assertThat;
28+
29+
/**
30+
* Integration tests for various error cases using {@link PostgresqlConnection}.
31+
*/
32+
final class PostgresqlConnectionErrorsIntegrationTests extends AbstractIntegrationTests {
33+
34+
@Test
35+
void commitShouldRecoverFromFailedTransaction() {
36+
37+
this.connection.beginTransaction().as(StepVerifier::create).verifyComplete();
38+
this.connection.createStatement("error").execute().flatMap(PostgresqlResult::getRowsUpdated).as(StepVerifier::create).verifyError(R2dbcBadGrammarException.class);
39+
40+
this.connection.commitTransaction().as(StepVerifier::create).verifyError(R2dbcRollbackException.class);
41+
42+
Awaitility.await().until(() -> this.connection.isAutoCommit());
43+
assertThat(this.connection.isAutoCommit()).isTrue();
44+
}
45+
46+
@Test
47+
void rollbackShouldRecoverFromFailedTransaction() {
48+
49+
this.connection.beginTransaction().as(StepVerifier::create).verifyComplete();
50+
this.connection.createStatement("error").execute().flatMap(PostgresqlResult::getRowsUpdated).as(StepVerifier::create).verifyError(R2dbcBadGrammarException.class);
51+
52+
this.connection.rollbackTransaction().as(StepVerifier::create).verifyComplete();
53+
54+
assertThat(this.connection.isAutoCommit()).isTrue();
55+
}
56+
57+
}

0 commit comments

Comments
 (0)