1
1
using Microsoft . Extensions . DependencyInjection ;
2
2
using System ;
3
+ using System . Data . Common ;
3
4
using System . Threading . Tasks ;
5
+ using System . Transactions ;
6
+ using Dapper ;
7
+ using Grpc . Core ;
8
+ using MySqlConnector ;
4
9
using Xunit ;
5
10
6
11
namespace Dtmgrpc . IntegrationTests
@@ -27,5 +32,100 @@ public async Task Submit_Should_Succeed()
27
32
var status = await ITTestHelper . GetTranStatus ( gid ) ;
28
33
Assert . Equal ( "succeed" , status ) ;
29
34
}
35
+
36
+ [ Fact ]
37
+ public async Task DoAndSubmit_Should_DbTrans_Exception ( )
38
+ {
39
+ var provider = ITTestHelper . AddDtmGrpc ( ) ;
40
+ var transFactory = provider . GetRequiredService < IDtmTransFactory > ( ) ;
41
+
42
+ var gid = "msgTestGid" + Guid . NewGuid ( ) . ToString ( ) ;
43
+ var msg = transFactory . NewMsgGrpc ( gid ) ;
44
+ var req = ITTestHelper . GenBusiReq ( false , false ) ;
45
+ var busiGrpc = ITTestHelper . BuisgRPCUrl ;
46
+
47
+ msg . Add ( busiGrpc + "/busi.Busi/TransIn" , req ) ;
48
+ // do TransOut local, then TransIn with DTM.
49
+ await Assert . ThrowsAsync < System . InvalidOperationException > ( async ( ) =>
50
+ {
51
+ // System.InvalidOperationException: A TransactionScope must be disposed on the same thread that it was created.
52
+ //
53
+ // System.InvalidOperationException
54
+ // A TransactionScope must be disposed on the same thread that it was created.
55
+ // at Dtmgrpc.MsgGrpc.DoAndSubmit(String queryPrepared, Func`2 busiCall, CancellationToken cancellationToken) in /home/yunjin/Data/projects/github/dtm-labs/client-csharp/src/Dtmgrpc/Msg/MsgGrpc.cs:line 110
56
+
57
+ await msg . DoAndSubmit ( busiGrpc + "/busi.Busi/QueryPreparedMySqlReal" , async branchBarrier =>
58
+ {
59
+ MySqlConnection conn = getBarrierMySqlConnection ( ) ;
60
+ await branchBarrier . Call ( conn , ( ) =>
61
+ {
62
+ Task task = this . LocalAdjustBalance ( conn , TransOutUID , - req . Amount , "SUCCESS" ) ;
63
+ return task ;
64
+ } ,
65
+ TransactionScopeOption . Required ,
66
+ IsolationLevel . ReadCommitted
67
+ // , default TransactionScopeAsyncFlowOption.Suppress
68
+ ) ;
69
+ } ) ;
70
+ } ) ;
71
+
72
+ await Task . Delay ( 4000 ) ;
73
+ var status = await ITTestHelper . GetTranStatus ( gid ) ;
74
+ // The exception did not affect the local transaction commit
75
+ Assert . Equal ( "succeed" , status ) ;
76
+ }
77
+
78
+ [ Fact ]
79
+ public async Task DoAndSubmit_Should_Succeed ( )
80
+ {
81
+ var provider = ITTestHelper . AddDtmGrpc ( ) ;
82
+ var transFactory = provider . GetRequiredService < IDtmTransFactory > ( ) ;
83
+
84
+ var gid = "msgTestGid" + Guid . NewGuid ( ) . ToString ( ) ;
85
+ var msg = transFactory . NewMsgGrpc ( gid ) ;
86
+ var req = ITTestHelper . GenBusiReq ( false , false ) ;
87
+ var busiGrpc = ITTestHelper . BuisgRPCUrl ;
88
+
89
+ msg . Add ( busiGrpc + "/busi.Busi/TransIn" , req ) ;
90
+ // do TransOut local, then TransIn with DTM.
91
+
92
+ await msg . DoAndSubmit ( busiGrpc + "/busi.Busi/QueryPreparedMySqlReal" , async branchBarrier =>
93
+ {
94
+ MySqlConnection conn = getBarrierMySqlConnection ( ) ;
95
+ await branchBarrier . Call ( conn , ( ) =>
96
+ {
97
+ Task task = this . LocalAdjustBalance ( conn , TransOutUID , - req . Amount , "SUCCESS" ) ;
98
+ return task ;
99
+ } ,
100
+ TransactionScopeOption . Required ,
101
+ IsolationLevel . ReadCommitted ,
102
+ TransactionScopeAsyncFlowOption . Enabled ) ;
103
+ } ) ;
104
+
105
+ await Task . Delay ( 2000 ) ;
106
+ var status = await ITTestHelper . GetTranStatus ( gid ) ;
107
+ Assert . Equal ( "succeed" , status ) ;
108
+ }
109
+
110
+ private static readonly int TransOutUID = 1 ;
111
+
112
+ private static readonly int TransInUID = 2 ;
113
+
114
+ private MySqlConnection getBarrierMySqlConnection ( ) => new ( "Server=localhost;port=3306;User ID=root;Password=123456;Database=dtm_barrier" ) ;
115
+
116
+ private async Task LocalAdjustBalance ( DbConnection conn , int uid , long amount , string result )
117
+ {
118
+ // _logger.LogInformation("AdjustBalanceLocal uid={uid}, amount={amount}, result={result}", uid, amount, result);
119
+
120
+ if ( result . Equals ( "FAILURE" ) )
121
+ {
122
+ throw new RpcException ( new Status ( StatusCode . Aborted , "FAILURE" ) ) ;
123
+ }
124
+
125
+ await conn . ExecuteAsync (
126
+ sql : "update dtm_busi.user_account set balance = balance + @balance where user_id = @user_id" ,
127
+ param : new { balance = amount , user_id = uid }
128
+ ) ;
129
+ }
30
130
}
31
131
}
0 commit comments