21
21
from dapr .ext .workflow .dapr_workflow_client import DaprWorkflowClient
22
22
from durabletask import client
23
23
import durabletask .internal .orchestrator_service_pb2 as pb
24
+ from grpc import StatusCode , RpcError
24
25
25
26
mock_schedule_result = 'workflow001'
26
27
mock_raise_event_result = 'event001'
29
30
mock_resume_result = 'resume001'
30
31
mock_purge_result = 'purge001'
31
32
mock_instance_id = 'instance001'
33
+ wf_exists = False
34
+
35
+ class SimulatedRpcError (RpcError ):
36
+ def __init__ (self , code , details ):
37
+ self ._code = code
38
+ self ._details = details
39
+
40
+ def code (self ):
41
+ return self ._code
42
+
43
+ def details (self ):
44
+ return self ._details
32
45
33
46
34
47
class FakeTaskHubGrpcClient :
@@ -43,8 +56,13 @@ def schedule_new_orchestration(
43
56
return mock_schedule_result
44
57
45
58
def get_orchestration_state (self , instance_id , fetch_payloads ):
59
+ global wf_exists
60
+ if not wf_exists :
61
+ raise SimulatedRpcError (code = "UNKNOWN" , details = "no such instance exists" )
62
+
46
63
return self ._inner_get_orchestration_state (instance_id , client .OrchestrationStatus .PENDING )
47
64
65
+
48
66
def wait_for_orchestration_start (self , instance_id , fetch_payloads , timeout ):
49
67
return self ._inner_get_orchestration_state (instance_id , client .OrchestrationStatus .RUNNING )
50
68
@@ -100,6 +118,13 @@ def test_client_functions(self):
100
118
)
101
119
assert actual_schedule_result == mock_schedule_result
102
120
121
+ actual_get_result = wfClient .get_workflow_state (instance_id = mock_instance_id ,
122
+ fetch_payloads = True )
123
+ assert actual_get_result is None
124
+
125
+
126
+ global wf_exists
127
+ wf_exists = True
103
128
actual_get_result = wfClient .get_workflow_state (
104
129
instance_id = mock_instance_id , fetch_payloads = True
105
130
)
0 commit comments