1
1
package com .arangodb .springframework .transaction ;
2
2
3
3
import com .arangodb .ArangoDatabase ;
4
- import com .arangodb .DbName ;
5
4
import com .arangodb .entity .StreamTransactionEntity ;
6
5
import com .arangodb .entity .StreamTransactionStatus ;
7
6
import com .arangodb .model .StreamTransactionOptions ;
12
11
import org .springframework .transaction .TransactionDefinition ;
13
12
import org .springframework .transaction .interceptor .TransactionAttribute ;
14
13
import org .springframework .transaction .support .SmartTransactionObject ;
14
+ import org .springframework .transaction .support .TransactionSynchronizationUtils ;
15
15
16
16
import java .util .Collection ;
17
+ import java .util .Collections ;
17
18
import java .util .HashSet ;
18
19
import java .util .Set ;
19
20
21
+ /**
22
+ * Transaction object created by {@link ArangoTransactionManager}.
23
+ */
20
24
class ArangoTransactionObject implements SmartTransactionObject {
21
25
22
26
private static final Log logger = LogFactory .getLog (ArangoTransactionObject .class );
23
27
24
28
private final ArangoDatabase database ;
25
- private final Set < String > writeCollections = new HashSet <>() ;
29
+ private final ArangoTransactionResource resource ;
26
30
private int timeout ;
27
- private StreamTransactionEntity streamTransaction ;
31
+ private StreamTransactionEntity transaction ;
28
32
29
33
ArangoTransactionObject (ArangoDatabase database , int defaultTimeout , @ Nullable ArangoTransactionResource resource ) {
30
34
this .database = database ;
35
+ this .resource = resource == null ? new ArangoTransactionResource (null , Collections .emptySet (), false ) : resource ;
31
36
this .timeout = defaultTimeout ;
32
- if (resource != null ) {
33
- writeCollections .addAll (resource .getCollectionNames ());
34
- if (resource .getStreamTransactionId () != null ) {
35
- streamTransaction = database .getStreamTransaction (resource .getStreamTransactionId ());
36
- }
37
- }
38
37
}
39
38
40
- ArangoTransactionResource createResource () {
41
- return new ArangoTransactionResource ( streamTransaction == null ? null : streamTransaction . getId (), writeCollections ) ;
39
+ ArangoTransactionResource getResource () {
40
+ return resource ;
42
41
}
43
42
44
43
boolean exists () {
45
- return streamTransaction != null ;
44
+ return resource . getStreamTransactionId () != null ;
46
45
}
47
46
48
47
void configure (TransactionDefinition definition ) {
@@ -56,55 +55,70 @@ void configure(TransactionDefinition definition) {
56
55
57
56
ArangoTransactionResource getOrBegin (Collection <String > collections ) {
58
57
addCollections (collections );
59
- if (streamTransaction != null ) {
60
- return createResource ();
58
+ if (resource . getStreamTransactionId () != null ) {
59
+ return getResource ();
61
60
}
62
61
StreamTransactionOptions options = new StreamTransactionOptions ()
63
62
.allowImplicit (true )
64
- .writeCollections (writeCollections .toArray (new String [0 ]))
63
+ .writeCollections (resource . getCollectionNames () .toArray (new String [0 ]))
65
64
.lockTimeout (Math .max (timeout , 0 ));
66
- streamTransaction = database .beginStreamTransaction (options );
65
+ transaction = database .beginStreamTransaction (options );
66
+ resource .setStreamTransactionId (transaction .getId ());
67
67
if (logger .isDebugEnabled ()) {
68
- logger .debug ("Began stream transaction " + streamTransaction . getId () + " writing collections " + writeCollections );
68
+ logger .debug ("Began stream transaction " + resource . getStreamTransactionId () + " writing collections " + resource . getCollectionNames () );
69
69
}
70
- return createResource ();
70
+ return getResource ();
71
71
}
72
72
73
73
void commit () {
74
- if (streamTransaction != null && streamTransaction . getStatus () == StreamTransactionStatus .running ) {
75
- database .commitStreamTransaction (streamTransaction . getId ());
74
+ if (isStatus ( StreamTransactionStatus .running ) ) {
75
+ database .commitStreamTransaction (resource . getStreamTransactionId ());
76
76
}
77
77
}
78
78
79
79
void rollback () {
80
- if (streamTransaction != null && streamTransaction . getStatus () == StreamTransactionStatus .running ) {
81
- database .abortStreamTransaction (streamTransaction . getId ());
80
+ if (isStatus ( StreamTransactionStatus .running ) ) {
81
+ database .abortStreamTransaction (resource . getStreamTransactionId ());
82
82
}
83
+ setRollbackOnly ();
83
84
}
84
85
85
86
@ Override
86
87
public boolean isRollbackOnly () {
87
- return streamTransaction != null && streamTransaction .getStatus () == StreamTransactionStatus .aborted ;
88
+ return resource .isRollbackOnly () || isStatus (StreamTransactionStatus .aborted );
89
+ }
90
+
91
+ public void setRollbackOnly () {
92
+ resource .setRollbackOnly (true );
88
93
}
89
94
90
95
@ Override
91
96
public void flush () {
92
- // nothing to do
97
+ TransactionSynchronizationUtils . triggerFlush ();
93
98
}
94
99
95
100
@ Override
96
101
public String toString () {
97
- return streamTransaction == null ? "(not begun)" : streamTransaction . getId ();
102
+ return resource . getStreamTransactionId () == null ? "(not begun)" : resource . getStreamTransactionId ();
98
103
}
99
104
100
105
private void addCollections (Collection <String > collections ) {
101
- if (streamTransaction != null ) {
102
- if (!writeCollections .containsAll (collections )) {
106
+ if (resource . getStreamTransactionId () != null ) {
107
+ if (!resource . getCollectionNames () .containsAll (collections )) {
103
108
Set <String > additional = new HashSet <>(collections );
104
- additional .removeAll (writeCollections );
105
- throw new IllegalTransactionStateException ("Stream transaction already started on collections " + writeCollections + ", no additional collections allowed: " + additional );
109
+ additional .removeAll (resource . getCollectionNames () );
110
+ throw new IllegalTransactionStateException ("Stream transaction already started on collections " + resource . getCollectionNames () + ", no additional collections allowed: " + additional );
106
111
}
107
112
}
108
- writeCollections .addAll (collections );
113
+ HashSet <String > all = new HashSet <>(resource .getCollectionNames ());
114
+ all .addAll (collections );
115
+ resource .setCollectionNames (all );
116
+ }
117
+
118
+ private boolean isStatus (StreamTransactionStatus status ) {
119
+ if (transaction == null && resource .getStreamTransactionId () != null ) {
120
+ transaction = database .getStreamTransaction (resource .getStreamTransactionId ());
121
+ }
122
+ return transaction != null && transaction .getStatus () == status ;
109
123
}
110
124
}
0 commit comments