Skip to content

Commit 0b109bf

Browse files
Catch error in sdk when workflow instance not found (dapr#771)
* Catch error in sdk when workflow instance not found Signed-off-by: Elena Kolevska <[email protected]> * fixes demo workflow example test Signed-off-by: Elena Kolevska <[email protected]> * Only return None for the correct error Signed-off-by: Elena Kolevska <[email protected]> * Adds test Signed-off-by: Elena Kolevska <[email protected]> * Linter Signed-off-by: Elena Kolevska <[email protected]> * Extends test Signed-off-by: Elena Kolevska <[email protected]> --------- Signed-off-by: Elena Kolevska <[email protected]>
1 parent fc4980d commit 0b109bf

File tree

4 files changed

+54
-3
lines changed

4 files changed

+54
-3
lines changed

examples/demo_workflow/app.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,13 +139,17 @@ def main():
139139

140140
# Pause Test
141141
d.pause_workflow(instance_id=instance_id, workflow_component=workflow_component)
142+
sleep(3)
143+
142144
get_response = d.get_workflow(
143145
instance_id=instance_id, workflow_component=workflow_component
144146
)
145147
print(f'Get response from {workflow_name} after pause call: {get_response.runtime_status}')
146148

147149
# Resume Test
148150
d.resume_workflow(instance_id=instance_id, workflow_component=workflow_component)
151+
sleep(3)
152+
149153
get_response = d.get_workflow(
150154
instance_id=instance_id, workflow_component=workflow_component
151155
)

examples/workflow/monitor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ def send_alert(ctx, message: str):
6868
status = wf_client.get_workflow_state(job_id)
6969
except Exception:
7070
pass
71+
7172
if not status or status.runtime_status.name != 'RUNNING':
7273
# TODO update to use reuse_id_policy
7374
instance_id = wf_client.schedule_new_workflow(

ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from dapr.ext.workflow.workflow_state import WorkflowState
2525
from dapr.ext.workflow.workflow_context import Workflow
2626
from dapr.ext.workflow.util import getAddress
27+
from grpc import RpcError
2728

2829
from dapr.clients import DaprInternalError
2930
from dapr.clients.http.client import DAPR_API_TOKEN_HEADER
@@ -130,8 +131,17 @@ def get_workflow_state(
130131
exist.
131132
132133
"""
133-
state = self.__obj.get_orchestration_state(instance_id, fetch_payloads=fetch_payloads)
134-
return WorkflowState(state) if state else None
134+
try:
135+
state = self.__obj.get_orchestration_state(instance_id, fetch_payloads=fetch_payloads)
136+
return WorkflowState(state) if state else None
137+
except RpcError as error:
138+
if 'no such instance exists' in error.details():
139+
self._logger.warning(f'Workflow instance not found: {instance_id}')
140+
return None
141+
self._logger.error(
142+
f'Unhandled RPC error while fetching workflow state: {error.code()} - {error.details()}'
143+
)
144+
raise
135145

136146
def wait_for_workflow_start(
137147
self, instance_id: str, *, fetch_payloads: bool = False, timeout_in_seconds: int = 60

ext/dapr-ext-workflow/tests/test_workflow_client.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from dapr.ext.workflow.dapr_workflow_client import DaprWorkflowClient
2222
from durabletask import client
2323
import durabletask.internal.orchestrator_service_pb2 as pb
24+
from grpc import RpcError
2425

2526
mock_schedule_result = 'workflow001'
2627
mock_raise_event_result = 'event001'
@@ -29,6 +30,19 @@
2930
mock_resume_result = 'resume001'
3031
mock_purge_result = 'purge001'
3132
mock_instance_id = 'instance001'
33+
wf_status = 'not-found'
34+
35+
36+
class SimulatedRpcError(RpcError):
37+
def __init__(self, code, details):
38+
self._code = code
39+
self._details = details
40+
41+
def code(self):
42+
return self._code
43+
44+
def details(self):
45+
return self._details
3246

3347

3448
class FakeTaskHubGrpcClient:
@@ -43,7 +57,15 @@ def schedule_new_orchestration(
4357
return mock_schedule_result
4458

4559
def get_orchestration_state(self, instance_id, fetch_payloads):
46-
return self._inner_get_orchestration_state(instance_id, client.OrchestrationStatus.PENDING)
60+
global wf_status
61+
if wf_status == 'not-found':
62+
raise SimulatedRpcError(code='UNKNOWN', details='no such instance exists')
63+
elif wf_status == 'found':
64+
return self._inner_get_orchestration_state(
65+
instance_id, client.OrchestrationStatus.PENDING
66+
)
67+
else:
68+
raise SimulatedRpcError(code='UNKNOWN', details='unknown error')
4769

4870
def wait_for_orchestration_start(self, instance_id, fetch_payloads, timeout):
4971
return self._inner_get_orchestration_state(instance_id, client.OrchestrationStatus.RUNNING)
@@ -100,6 +122,20 @@ def test_client_functions(self):
100122
)
101123
assert actual_schedule_result == mock_schedule_result
102124

125+
global wf_status
126+
wf_status = 'not-found'
127+
actual_get_result = wfClient.get_workflow_state(
128+
instance_id=mock_instance_id, fetch_payloads=True
129+
)
130+
assert actual_get_result is None
131+
132+
wf_status = 'error'
133+
with self.assertRaises(RpcError):
134+
wfClient.get_workflow_state(instance_id=mock_instance_id, fetch_payloads=True)
135+
136+
assert actual_get_result is None
137+
138+
wf_status = 'found'
103139
actual_get_result = wfClient.get_workflow_state(
104140
instance_id=mock_instance_id, fetch_payloads=True
105141
)

0 commit comments

Comments
 (0)