Skip to content

Commit de90b57

Browse files
authored
Add support for transaction type (#1303)
This is an internal feature only that allows specifying transaction type on reactive session begin method.
1 parent 5488140 commit de90b57

29 files changed

+299
-127
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,10 +123,19 @@ public CompletionStage<RxResultCursor> runRx(Query query, TransactionConfig conf
123123
}
124124

125125
public CompletionStage<UnmanagedTransaction> beginTransactionAsync(TransactionConfig config) {
126-
return this.beginTransactionAsync(mode, config);
126+
return beginTransactionAsync(mode, config, null);
127+
}
128+
129+
public CompletionStage<UnmanagedTransaction> beginTransactionAsync(TransactionConfig config, String txType) {
130+
return this.beginTransactionAsync(mode, config, txType);
127131
}
128132

129133
public CompletionStage<UnmanagedTransaction> beginTransactionAsync(AccessMode mode, TransactionConfig config) {
134+
return beginTransactionAsync(mode, config, null);
135+
}
136+
137+
public CompletionStage<UnmanagedTransaction> beginTransactionAsync(
138+
AccessMode mode, TransactionConfig config, String txType) {
130139
ensureSessionIsOpen();
131140

132141
// create a chain that acquires connection and starts a transaction
@@ -136,7 +145,7 @@ public CompletionStage<UnmanagedTransaction> beginTransactionAsync(AccessMode mo
136145
ImpersonationUtil.ensureImpersonationSupport(connection, connection.impersonatedUser()))
137146
.thenCompose(connection -> {
138147
UnmanagedTransaction tx = new UnmanagedTransaction(connection, this::handleNewBookmark, fetchSize);
139-
return tx.beginAsync(determineBookmarks(false), config);
148+
return tx.beginAsync(determineBookmarks(false), config, txType);
140149
});
141150

142151
// update the reference to the only known transaction

driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -110,20 +110,22 @@ protected UnmanagedTransaction(
110110
this.fetchSize = fetchSize;
111111
}
112112

113-
public CompletionStage<UnmanagedTransaction> beginAsync(Set<Bookmark> initialBookmarks, TransactionConfig config) {
114-
return protocol.beginTransaction(connection, initialBookmarks, config).handle((ignore, beginError) -> {
115-
if (beginError != null) {
116-
if (beginError instanceof AuthorizationExpiredException) {
117-
connection.terminateAndRelease(AuthorizationExpiredException.DESCRIPTION);
118-
} else if (beginError instanceof ConnectionReadTimeoutException) {
119-
connection.terminateAndRelease(beginError.getMessage());
120-
} else {
121-
connection.release();
122-
}
123-
throw asCompletionException(beginError);
124-
}
125-
return this;
126-
});
113+
public CompletionStage<UnmanagedTransaction> beginAsync(
114+
Set<Bookmark> initialBookmarks, TransactionConfig config, String txType) {
115+
return protocol.beginTransaction(connection, initialBookmarks, config, txType)
116+
.handle((ignore, beginError) -> {
117+
if (beginError != null) {
118+
if (beginError instanceof AuthorizationExpiredException) {
119+
connection.terminateAndRelease(AuthorizationExpiredException.DESCRIPTION);
120+
} else if (beginError instanceof ConnectionReadTimeoutException) {
121+
connection.terminateAndRelease(beginError.getMessage());
122+
} else {
123+
connection.release();
124+
}
125+
throw asCompletionException(beginError);
126+
}
127+
return this;
128+
});
127129
}
128130

129131
public CompletionStage<Void> closeAsync() {

driver/src/main/java/org/neo4j/driver/internal/messaging/BoltProtocol.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,11 @@ void initializeChannel(
7979
* @param connection the connection to use.
8080
* @param bookmarks the bookmarks. Never null, should be empty when there are no bookmarks.
8181
* @param config the transaction configuration. Never null, should be {@link TransactionConfig#empty()} when absent.
82+
* @param txType the Kernel transaction type
8283
* @return a completion stage completed when transaction is started or completed exceptionally when there was a failure.
8384
*/
84-
CompletionStage<Void> beginTransaction(Connection connection, Set<Bookmark> bookmarks, TransactionConfig config);
85+
CompletionStage<Void> beginTransaction(
86+
Connection connection, Set<Bookmark> bookmarks, TransactionConfig config, String txType);
8587

8688
/**
8789
* Commit the unmanaged transaction.

driver/src/main/java/org/neo4j/driver/internal/messaging/request/BeginMessage.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,9 @@ public BeginMessage(
3838
TransactionConfig config,
3939
DatabaseName databaseName,
4040
AccessMode mode,
41-
String impersonatedUser) {
42-
this(bookmarks, config.timeout(), config.metadata(), mode, databaseName, impersonatedUser);
41+
String impersonatedUser,
42+
String txType) {
43+
this(bookmarks, config.timeout(), config.metadata(), mode, databaseName, impersonatedUser, txType);
4344
}
4445

4546
public BeginMessage(
@@ -48,8 +49,9 @@ public BeginMessage(
4849
Map<String, Value> txMetadata,
4950
AccessMode mode,
5051
DatabaseName databaseName,
51-
String impersonatedUser) {
52-
super(buildMetadata(txTimeout, txMetadata, databaseName, mode, bookmarks, impersonatedUser));
52+
String impersonatedUser,
53+
String txType) {
54+
super(buildMetadata(txTimeout, txMetadata, databaseName, mode, bookmarks, impersonatedUser, txType));
5355
}
5456

5557
@Override

driver/src/main/java/org/neo4j/driver/internal/messaging/request/RunWithMetadataMessage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public static RunWithMetadataMessage autoCommitTxRunMessage(
5959
Set<Bookmark> bookmarks,
6060
String impersonatedUser) {
6161
Map<String, Value> metadata =
62-
buildMetadata(txTimeout, txMetadata, databaseName, mode, bookmarks, impersonatedUser);
62+
buildMetadata(txTimeout, txMetadata, databaseName, mode, bookmarks, impersonatedUser, null);
6363
return new RunWithMetadataMessage(query.text(), query.parameters().asMap(ofValue()), metadata);
6464
}
6565

driver/src/main/java/org/neo4j/driver/internal/messaging/request/TransactionMetadataBuilder.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,16 @@ public class TransactionMetadataBuilder {
3939
private static final String MODE_KEY = "mode";
4040
private static final String MODE_READ_VALUE = "r";
4141
private static final String IMPERSONATED_USER_KEY = "imp_user";
42+
private static final String TX_TYPE_KEY = "tx_type";
4243

4344
public static Map<String, Value> buildMetadata(
4445
Duration txTimeout,
4546
Map<String, Value> txMetadata,
4647
AccessMode mode,
4748
Set<Bookmark> bookmarks,
48-
String impersonatedUser) {
49-
return buildMetadata(txTimeout, txMetadata, defaultDatabase(), mode, bookmarks, impersonatedUser);
49+
String impersonatedUser,
50+
String txType) {
51+
return buildMetadata(txTimeout, txMetadata, defaultDatabase(), mode, bookmarks, impersonatedUser, txType);
5052
}
5153

5254
public static Map<String, Value> buildMetadata(
@@ -55,20 +57,23 @@ public static Map<String, Value> buildMetadata(
5557
DatabaseName databaseName,
5658
AccessMode mode,
5759
Set<Bookmark> bookmarks,
58-
String impersonatedUser) {
60+
String impersonatedUser,
61+
String txType) {
5962
boolean bookmarksPresent = !bookmarks.isEmpty();
6063
boolean txTimeoutPresent = txTimeout != null;
6164
boolean txMetadataPresent = txMetadata != null && !txMetadata.isEmpty();
6265
boolean accessModePresent = mode == AccessMode.READ;
6366
boolean databaseNamePresent = databaseName.databaseName().isPresent();
6467
boolean impersonatedUserPresent = impersonatedUser != null;
68+
boolean txTypePresent = txType != null;
6569

6670
if (!bookmarksPresent
6771
&& !txTimeoutPresent
6872
&& !txMetadataPresent
6973
&& !accessModePresent
7074
&& !databaseNamePresent
71-
&& !impersonatedUserPresent) {
75+
&& !impersonatedUserPresent
76+
&& !txTypePresent) {
7277
return emptyMap();
7378
}
7479

@@ -89,6 +94,9 @@ public static Map<String, Value> buildMetadata(
8994
if (impersonatedUserPresent) {
9095
result.put(IMPERSONATED_USER_KEY, value(impersonatedUser));
9196
}
97+
if (txTypePresent) {
98+
result.put(TX_TYPE_KEY, value(txType));
99+
}
92100

93101
databaseName.databaseName().ifPresent(name -> result.put(DATABASE_NAME_KEY, value(name)));
94102

driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public void prepareToCloseChannel(Channel channel) {
113113

114114
@Override
115115
public CompletionStage<Void> beginTransaction(
116-
Connection connection, Set<Bookmark> bookmarks, TransactionConfig config) {
116+
Connection connection, Set<Bookmark> bookmarks, TransactionConfig config, String txType) {
117117
try {
118118
verifyDatabaseNameBeforeTransaction(connection.databaseName());
119119
} catch (Exception error) {
@@ -122,7 +122,7 @@ public CompletionStage<Void> beginTransaction(
122122

123123
CompletableFuture<Void> beginTxFuture = new CompletableFuture<>();
124124
BeginMessage beginMessage = new BeginMessage(
125-
bookmarks, config, connection.databaseName(), connection.mode(), connection.impersonatedUser());
125+
bookmarks, config, connection.databaseName(), connection.mode(), connection.impersonatedUser(), txType);
126126
connection.writeAndFlush(beginMessage, new BeginTxResponseHandler(beginTxFuture));
127127
return beginTxFuture;
128128
}

driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveSession.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,14 @@ public AbstractReactiveSession(NetworkSession session) {
5050
abstract Publisher<Void> closeTransaction(S transaction, boolean commit);
5151

5252
Publisher<S> doBeginTransaction(TransactionConfig config) {
53+
return doBeginTransaction(config, null);
54+
}
55+
56+
Publisher<S> doBeginTransaction(TransactionConfig config, String txType) {
5357
return createSingleItemPublisher(
5458
() -> {
5559
CompletableFuture<S> txFuture = new CompletableFuture<>();
56-
session.beginTransactionAsync(config).whenComplete((tx, completionError) -> {
60+
session.beginTransactionAsync(config, txType).whenComplete((tx, completionError) -> {
5761
if (tx != null) {
5862
txFuture.complete(createTransaction(tx));
5963
} else {

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveSession.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,11 @@ org.reactivestreams.Publisher<Void> closeTransaction(ReactiveTransaction transac
5757

5858
@Override
5959
public Publisher<ReactiveTransaction> beginTransaction(TransactionConfig config) {
60-
return publisherToFlowPublisher(doBeginTransaction(config));
60+
return beginTransaction(config, null);
61+
}
62+
63+
public Publisher<ReactiveTransaction> beginTransaction(TransactionConfig config, String txType) {
64+
return publisherToFlowPublisher(doBeginTransaction(config, txType));
6165
}
6266

6367
@Override
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.integration.reactive;
20+
21+
import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V4;
22+
import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux;
23+
24+
import java.util.function.Function;
25+
import org.junit.jupiter.api.BeforeEach;
26+
import org.junit.jupiter.api.extension.RegisterExtension;
27+
import org.junit.jupiter.params.ParameterizedTest;
28+
import org.junit.jupiter.params.provider.NullSource;
29+
import org.junit.jupiter.params.provider.ValueSource;
30+
import org.neo4j.driver.TransactionConfig;
31+
import org.neo4j.driver.internal.reactive.InternalReactiveSession;
32+
import org.neo4j.driver.internal.util.EnabledOnNeo4jWith;
33+
import org.neo4j.driver.reactive.ReactiveTransaction;
34+
import org.neo4j.driver.summary.ResultSummary;
35+
import org.neo4j.driver.testutil.DatabaseExtension;
36+
import org.neo4j.driver.testutil.ParallelizableIT;
37+
import reactor.core.publisher.Mono;
38+
import reactor.test.StepVerifier;
39+
40+
@EnabledOnNeo4jWith(BOLT_V4)
41+
@ParallelizableIT
42+
class InternalReactiveSessionIT {
43+
@RegisterExtension
44+
static final DatabaseExtension neo4j = new DatabaseExtension();
45+
46+
private InternalReactiveSession session;
47+
48+
@BeforeEach
49+
void setUp() {
50+
session = (InternalReactiveSession) neo4j.driver().reactiveSession();
51+
}
52+
53+
@ParameterizedTest
54+
@NullSource
55+
@ValueSource(strings = {"IMPLICIT", ""})
56+
void shouldAcceptTxTypeWhenAvailable(String txType) {
57+
// GIVEN
58+
var txConfig = TransactionConfig.empty();
59+
var txMono = Mono.fromDirect(flowPublisherToFlux(session.beginTransaction(txConfig, txType)));
60+
Function<ReactiveTransaction, Mono<ResultSummary>> txUnit =
61+
tx -> Mono.fromDirect(flowPublisherToFlux(tx.run("RETURN 1")))
62+
.flatMap(result -> Mono.fromDirect(flowPublisherToFlux(result.consume())));
63+
Function<ReactiveTransaction, Mono<Void>> commit = tx -> Mono.fromDirect(flowPublisherToFlux(tx.commit()));
64+
65+
// WHEN
66+
var summaryMono = Mono.usingWhen(txMono, txUnit, commit);
67+
68+
// THEN
69+
StepVerifier.create(summaryMono).expectNextCount(1).expectComplete().verify();
70+
}
71+
}

driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,8 @@ void shouldReleaseConnectionWhenBeginFails() {
177177
Set<Bookmark> bookmarks = Collections.singleton(InternalBookmark.parse("SomeBookmark"));
178178
TransactionConfig txConfig = TransactionConfig.empty();
179179

180-
RuntimeException e = assertThrows(RuntimeException.class, () -> await(tx.beginAsync(bookmarks, txConfig)));
180+
RuntimeException e =
181+
assertThrows(RuntimeException.class, () -> await(tx.beginAsync(bookmarks, txConfig, null)));
181182

182183
assertEquals(error, e);
183184
verify(connection).release();
@@ -191,7 +192,7 @@ void shouldNotReleaseConnectionWhenBeginSucceeds() {
191192
Set<Bookmark> bookmarks = Collections.singleton(InternalBookmark.parse("SomeBookmark"));
192193
TransactionConfig txConfig = TransactionConfig.empty();
193194

194-
await(tx.beginAsync(bookmarks, txConfig));
195+
await(tx.beginAsync(bookmarks, txConfig, null));
195196

196197
verify(connection, never()).release();
197198
}
@@ -285,8 +286,8 @@ void shouldReleaseConnectionOnConnectionAuthorizationExpiredExceptionFailure() {
285286
Set<Bookmark> bookmarks = Collections.singleton(InternalBookmark.parse("SomeBookmark"));
286287
TransactionConfig txConfig = TransactionConfig.empty();
287288

288-
AuthorizationExpiredException actualException =
289-
assertThrows(AuthorizationExpiredException.class, () -> await(tx.beginAsync(bookmarks, txConfig)));
289+
AuthorizationExpiredException actualException = assertThrows(
290+
AuthorizationExpiredException.class, () -> await(tx.beginAsync(bookmarks, txConfig, null)));
290291

291292
assertSame(exception, actualException);
292293
verify(connection).terminateAndRelease(AuthorizationExpiredException.DESCRIPTION);
@@ -301,8 +302,8 @@ void shouldReleaseConnectionOnConnectionReadTimeoutExceptionFailure() {
301302
Set<Bookmark> bookmarks = Collections.singleton(InternalBookmark.parse("SomeBookmark"));
302303
TransactionConfig txConfig = TransactionConfig.empty();
303304

304-
ConnectionReadTimeoutException actualException =
305-
assertThrows(ConnectionReadTimeoutException.class, () -> await(tx.beginAsync(bookmarks, txConfig)));
305+
ConnectionReadTimeoutException actualException = assertThrows(
306+
ConnectionReadTimeoutException.class, () -> await(tx.beginAsync(bookmarks, txConfig, null)));
306307

307308
assertSame(ConnectionReadTimeoutException.INSTANCE, actualException);
308309
verify(connection).terminateAndRelease(ConnectionReadTimeoutException.INSTANCE.getMessage());
@@ -461,7 +462,7 @@ private static UnmanagedTransaction beginTx(Connection connection) {
461462

462463
private static UnmanagedTransaction beginTx(Connection connection, Set<Bookmark> initialBookmarks) {
463464
UnmanagedTransaction tx = new UnmanagedTransaction(connection, (ignored) -> {}, UNLIMITED_FETCH_SIZE);
464-
return await(tx.beginAsync(initialBookmarks, TransactionConfig.empty()));
465+
return await(tx.beginAsync(initialBookmarks, TransactionConfig.empty(), null));
465466
}
466467

467468
private static Connection connectionWithBegin(Consumer<ResponseHandler> beginBehaviour) {

driver/src/test/java/org/neo4j/driver/internal/messaging/encode/BeginMessageEncoderTest.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ class BeginMessageEncoderTest {
5252

5353
@ParameterizedTest
5454
@MethodSource("arguments")
55-
void shouldEncodeBeginMessage(AccessMode mode, String impersonatedUser) throws Exception {
55+
void shouldEncodeBeginMessage(AccessMode mode, String impersonatedUser, String txType) throws Exception {
5656
Set<Bookmark> bookmarks = Collections.singleton(InternalBookmark.parse("neo4j:bookmark:v1:tx42"));
5757

5858
Map<String, Value> txMetadata = new HashMap<>();
@@ -62,7 +62,8 @@ void shouldEncodeBeginMessage(AccessMode mode, String impersonatedUser) throws E
6262
Duration txTimeout = Duration.ofSeconds(1);
6363

6464
encoder.encode(
65-
new BeginMessage(bookmarks, txTimeout, txMetadata, mode, defaultDatabase(), impersonatedUser), packer);
65+
new BeginMessage(bookmarks, txTimeout, txMetadata, mode, defaultDatabase(), impersonatedUser, txType),
66+
packer);
6667

6768
InOrder order = inOrder(packer);
6869
order.verify(packer).packStructHeader(1, BeginMessage.SIGNATURE);
@@ -78,13 +79,17 @@ void shouldEncodeBeginMessage(AccessMode mode, String impersonatedUser) throws E
7879
if (impersonatedUser != null) {
7980
expectedMetadata.put("imp_user", value(impersonatedUser));
8081
}
82+
if (txType != null) {
83+
expectedMetadata.put("tx_type", value(txType));
84+
}
8185

8286
order.verify(packer).pack(expectedMetadata);
8387
}
8488

8589
private static Stream<Arguments> arguments() {
8690
return Arrays.stream(AccessMode.values())
87-
.flatMap(accessMode -> Stream.of(Arguments.of(accessMode, "user"), Arguments.of(accessMode, null)));
91+
.flatMap(accessMode ->
92+
Stream.of(Arguments.of(accessMode, "user", "IMPLICIT"), Arguments.of(accessMode, null, null)));
8893
}
8994

9095
@Test

0 commit comments

Comments
 (0)