Skip to content

Commit 477b12b

Browse files
authored
bugfix: fix workflow return null handling (#94)
- Fix null value handling for execute again - Add unit test and sample for different workflow return
1 parent 7242fd7 commit 477b12b

File tree

3 files changed

+114
-83
lines changed

3 files changed

+114
-83
lines changed

samples/DtmSample/Controllers/WfTestController.cs

Lines changed: 2 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@
77
using Microsoft.Extensions.Options;
88
using System;
99
using System.IO;
10+
using System.Diagnostics;
1011
using System.Net.Http;
1112
using System.Net.Http.Headers;
1213
using System.Text;
1314
using System.Text.Json;
1415
using System.Text.Unicode;
1516
using System.Threading;
1617
using System.Threading.Tasks;
18+
using Exception = System.Exception;
1719

1820
namespace DtmSample.Controllers
1921
{
@@ -255,84 +257,5 @@ public async Task<IActionResult> TccRollBack(CancellationToken cancellationToken
255257
return Ok(TransResponse.BuildFailureResponse());
256258
}
257259
}
258-
259-
260-
private static readonly string wfNameForResume = "wfNameForResume";
261-
262-
/// <summary>
263-
///
264-
/// </summary>
265-
/// <param name="cancellationToken"></param>
266-
/// <returns></returns>
267-
[HttpPost("wf-crash")]
268-
public async Task<IActionResult> Crash(CancellationToken cancellationToken)
269-
{
270-
if (!_globalTransaction.Exists(wfNameForResume))
271-
{
272-
_globalTransaction.Register(wfNameForResume, async (wf, data) =>
273-
{
274-
var content = new ByteArrayContent(data);
275-
content.Headers.ContentType = new MediaTypeHeaderValue("application/json");
276-
277-
var outClient = wf.NewBranch().NewRequest();
278-
await outClient.PostAsync(_settings.BusiUrl + "/TransOut", content);
279-
280-
// the first branch succeed, then crashed, the dtm server will call back the flowing wf-call-back
281-
// manual stop application
282-
Environment.Exit(0);
283-
284-
var inClient = wf.NewBranch().NewRequest();
285-
await inClient.PostAsync(_settings.BusiUrl + "/TransIn", content);
286-
287-
return null;
288-
});
289-
}
290-
291-
var req = JsonSerializer.Serialize(new TransRequest("1", -30));
292-
await _globalTransaction.Execute(wfNameForResume, Guid.NewGuid().ToString("N"), Encoding.UTF8.GetBytes(req), true);
293-
294-
return Ok(TransResponse.BuildSucceedResponse());
295-
}
296-
297-
[HttpPost("wf-resume")]
298-
public async Task<IActionResult> WfResume(CancellationToken cancellationToken)
299-
{
300-
try
301-
{
302-
if (!_globalTransaction.Exists(wfNameForResume))
303-
{
304-
// register again after manual crash by Environment.Exit(0);
305-
_globalTransaction.Register(wfNameForResume, async (wf, data) =>
306-
{
307-
var content = new ByteArrayContent(data);
308-
content.Headers.ContentType = new MediaTypeHeaderValue("application/json");
309-
310-
var outClient = wf.NewBranch().NewRequest();
311-
await outClient.PostAsync(_settings.BusiUrl + "/TransOut", content);
312-
313-
var inClient = wf.NewBranch().NewRequest();
314-
await inClient.PostAsync(_settings.BusiUrl + "/TransIn", content);
315-
316-
return null;
317-
});
318-
}
319-
320-
// prepared call ExecuteByQS
321-
using var bodyMemoryStream = new MemoryStream();
322-
await Request.Body.CopyToAsync(bodyMemoryStream, cancellationToken);
323-
byte[] bytes = bodyMemoryStream.ToArray();
324-
string body = Encoding.UTF8.GetString(bytes);
325-
_logger.LogDebug($"body: {body}");
326-
327-
await _globalTransaction.ExecuteByQS(Request.Query, bodyMemoryStream.ToArray());
328-
329-
return Ok(TransResponse.BuildSucceedResponse());
330-
}
331-
catch (Exception ex)
332-
{
333-
_logger.LogError(ex, "Workflow Error");
334-
return Ok(TransResponse.BuildFailureResponse());
335-
}
336-
}
337260
}
338261
}

src/Dtmworkflow/Workflow.Imp.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ internal async Task<byte[]> Process(WfFunc2 handler, byte[] data)
3939
var status = reply.Transaction.Status;
4040
if (status == DtmCommon.Constant.StatusSucceed)
4141
{
42-
var sRes = Convert.FromBase64String(reply.Transaction.Result);
42+
var sRes = reply.Transaction.Result != null
43+
? Convert.FromBase64String(reply.Transaction.Result)
44+
: null;
4345
return sRes;
4446
}
4547
else if (status == DtmCommon.Constant.StatusFailed)

tests/Dtmworkflow.Tests/WorkflowHttpTests.cs

Lines changed: 109 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,114 @@ public async void Commit_Should_Be_Executed()
255255
rollBackFunc.Verify(x => x.Invoke(It.IsAny<BranchBarrier>()), Times.Never);
256256
commitFunc.Verify(x => x.Invoke(It.IsAny<BranchBarrier>()), Times.Once);
257257
}
258+
259+
[Fact]
260+
public async Task Execute_Result_Should_Be_WfFunc2()
261+
{
262+
var factory = new Mock<IWorkflowFactory>();
263+
var httpClient = new Mock<IDtmClient>();
264+
var grpcClient = new Mock<IDtmgRPCClient>();
265+
var httpBb = new Mock<Dtmcli.IBranchBarrierFactory>();
266+
267+
SetupPrepareWorkflow(httpClient, DtmCommon.Constant.StatusPrepared, null);
268+
var wf = SetupWorkFlow(httpClient, grpcClient, httpBb);
269+
270+
factory.Setup(x => x.NewWorkflow(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<byte[]>(), It.IsAny<bool>())).Returns(wf.Object);
271+
272+
var wfgt = new WorkflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
273+
274+
var wfName = nameof(Execute_Result_Should_Be_WfFunc2);
275+
var gid = Guid.NewGuid().ToString("N");
276+
277+
wfgt.Register(wfName, (workflow, data) => Task.FromResult(Encoding.UTF8.GetBytes("return value from WfFunc2")));
278+
279+
var req = JsonSerializer.Serialize(new { userId = "1", amount = 30 });
280+
var res = await wfgt.Execute(wfName, gid, Encoding.UTF8.GetBytes(req), true);
281+
282+
Assert.Equal("return value from WfFunc2", Encoding.UTF8.GetString(res));
283+
}
284+
285+
[Fact]
286+
public async Task Execute_Result_Should_Be_Previous()
287+
{
288+
var factory = new Mock<IWorkflowFactory>();
289+
var httpClient = new Mock<IDtmClient>();
290+
var grpcClient = new Mock<IDtmgRPCClient>();
291+
var httpBb = new Mock<Dtmcli.IBranchBarrierFactory>();
292+
293+
SetupPrepareWorkflow(httpClient, DtmCommon.Constant.StatusSucceed, "return value from previous");
294+
var wf = SetupWorkFlow(httpClient, grpcClient, httpBb);
295+
296+
factory.Setup(x => x.NewWorkflow(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<byte[]>(), It.IsAny<bool>())).Returns(wf.Object);
297+
298+
var wfgt = new WorkflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
299+
300+
var wfName = nameof(Execute_Result_Should_Be_Previous);
301+
var gid = Guid.NewGuid().ToString("N");
302+
303+
wfgt.Register(wfName, (workflow, data) => Task.FromResult(Encoding.UTF8.GetBytes("return value from WfFunc2")));
304+
305+
var req = JsonSerializer.Serialize(new { userId = "1", amount = 30 });
306+
var res = await wfgt.Execute(wfName, gid, Encoding.UTF8.GetBytes(req), true);
307+
308+
Assert.Equal("return value from previous", Encoding.UTF8.GetString(res));
309+
}
310+
311+
[Fact]
312+
public async Task Execute_Again_Result_Should_Be_Previous()
313+
{
314+
var factory = new Mock<IWorkflowFactory>();
315+
var httpClient1 = new Mock<IDtmClient>();
316+
var httpClient2 = new Mock<IDtmClient>();
317+
var grpcClient = new Mock<IDtmgRPCClient>();
318+
var httpBb = new Mock<Dtmcli.IBranchBarrierFactory>();
319+
320+
// first
321+
SetupPrepareWorkflow(httpClient1, DtmCommon.Constant.StatusPrepared, null);
322+
var wf = SetupWorkFlow(httpClient1, grpcClient, httpBb);
323+
factory.Setup(x => x.NewWorkflow(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<byte[]>(), It.IsAny<bool>())).Returns(wf.Object);
324+
var wfgt = new WorkflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
325+
var wfName = nameof(Execute_Again_Result_Should_Be_Previous);
326+
var gid = Guid.NewGuid().ToString("N");
327+
wfgt.Register(wfName, (workflow, data) => Task.FromResult(Encoding.UTF8.GetBytes("return value from WfFunc2")));
328+
var req = JsonSerializer.Serialize(new { userId = "1", amount = 30 });
329+
var res = await wfgt.Execute(wfName, gid, Encoding.UTF8.GetBytes(req), true);
330+
Assert.Equal("return value from WfFunc2", Encoding.UTF8.GetString(res));
331+
332+
// again
333+
SetupPrepareWorkflow(httpClient2, DtmCommon.Constant.StatusSucceed, "return value from previous");
334+
wf = SetupWorkFlow(httpClient2, grpcClient, httpBb);
335+
factory.Setup(x => x.NewWorkflow(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<byte[]>(), It.IsAny<bool>())).Returns(wf.Object);
336+
wfgt = new WorkflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
337+
gid = Guid.NewGuid().ToString("N");
338+
wfgt.Register(wfName, (workflow, data) => Task.FromResult(Encoding.UTF8.GetBytes("return value from WfFunc2")));
339+
req = JsonSerializer.Serialize(new { userId = "1", amount = 30 });
340+
res = await wfgt.Execute(wfName, gid, Encoding.UTF8.GetBytes(req), true);
341+
Assert.Equal("return value from previous", Encoding.UTF8.GetString(res));
342+
}
343+
344+
[Fact]
345+
public async Task Execute_Again_Result_StringEmpty()
346+
{
347+
var factory = new Mock<IWorkflowFactory>();
348+
var httpClient = new Mock<IDtmClient>();
349+
var grpcClient = new Mock<IDtmgRPCClient>();
350+
var httpBb = new Mock<Dtmcli.IBranchBarrierFactory>();
351+
352+
// again
353+
SetupPrepareWorkflow(httpClient, DtmCommon.Constant.StatusSucceed, null);
354+
var wf = SetupWorkFlow(httpClient, grpcClient, httpBb);
355+
factory.Setup(x => x.NewWorkflow(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<byte[]>(), It.IsAny<bool>())).Returns(wf.Object);
356+
var wfgt = new WorkflowGlobalTransaction(factory.Object, NullLoggerFactory.Instance);
357+
var wfName = nameof(Execute_Again_Result_StringEmpty);
358+
var gid = Guid.NewGuid().ToString("N");
359+
wfgt.Register(wfName, (workflow, data) => Task.FromResult(Encoding.UTF8.GetBytes("return value from WfFunc2")));
360+
var req = JsonSerializer.Serialize(new { userId = "1", amount = 30 });
361+
var res = await wfgt.Execute(wfName, gid, Encoding.UTF8.GetBytes(req), true);
362+
Assert.Null(res);
363+
}
258364

259-
private void SetupPrepareWorkflow(Mock<IDtmClient> httpClient, string status, string result, List<DtmProgressDto> progressDtos = null)
365+
private void SetupPrepareWorkflow(Mock<IDtmClient> httpClient, string status, string? result, List<DtmProgressDto> progressDtos = null)
260366
{
261367
var httpResp = new HttpResponseMessage(HttpStatusCode.OK);
262368
httpResp.Content = new StringContent(JsonSerializer.Serialize(
@@ -265,9 +371,9 @@ private void SetupPrepareWorkflow(Mock<IDtmClient> httpClient, string status, st
265371
Transaction = new DtmTransactionDto
266372
{
267373
Status = status,
268-
Result = Convert.ToBase64String(Encoding.UTF8.GetBytes(result))
374+
Result = result == null ? null : Convert.ToBase64String(Encoding.UTF8.GetBytes(result))
269375
},
270-
Progresses = progressDtos
376+
Progresses = progressDtos ?? []
271377
}));
272378
httpClient.Setup(x => x.PrepareWorkflow(It.IsAny<DtmCommon.TransBase>(), It.IsAny<CancellationToken>())).Returns(Task.FromResult(httpResp));
273379
}

0 commit comments

Comments
 (0)