Skip to content

Commit f430bc2

Browse files
Removes default timeout for wait_for_orchestration_start and wait_for_orchestration_completion
Signed-off-by: Elena Kolevska <[email protected]>
1 parent 61a8492 commit f430bc2

File tree

2 files changed

+71
-8
lines changed

2 files changed

+71
-8
lines changed

durabletask/client.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -129,11 +129,13 @@ def get_orchestration_state(self, instance_id: str, *, fetch_payloads: bool = Tr
129129

130130
def wait_for_orchestration_start(self, instance_id: str, *,
131131
fetch_payloads: bool = False,
132-
timeout: int = 60) -> Optional[OrchestrationState]:
132+
timeout: int = 0) -> Optional[OrchestrationState]:
133133
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
134134
try:
135-
self._logger.info(f"Waiting up to {timeout}s for instance '{instance_id}' to start.")
136-
res: pb.GetInstanceResponse = self._stub.WaitForInstanceStart(req, timeout=timeout)
135+
grpc_timeout = None if timeout == 0 else timeout
136+
self._logger.info(
137+
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to start.")
138+
res: pb.GetInstanceResponse = self._stub.WaitForInstanceStart(req, timeout=grpc_timeout)
137139
return new_orchestration_state(req.instanceId, res)
138140
except grpc.RpcError as rpc_error:
139141
if rpc_error.code() == grpc.StatusCode.DEADLINE_EXCEEDED: # type: ignore
@@ -144,11 +146,13 @@ def wait_for_orchestration_start(self, instance_id: str, *,
144146

145147
def wait_for_orchestration_completion(self, instance_id: str, *,
146148
fetch_payloads: bool = True,
147-
timeout: int = 60) -> Optional[OrchestrationState]:
149+
timeout: int = 0) -> Optional[OrchestrationState]:
148150
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
149151
try:
150-
self._logger.info(f"Waiting {timeout}s for instance '{instance_id}' to complete.")
151-
res: pb.GetInstanceResponse = self._stub.WaitForInstanceCompletion(req, timeout=timeout)
152+
grpc_timeout = None if timeout == 0 else timeout
153+
self._logger.info(
154+
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to complete.")
155+
res: pb.GetInstanceResponse = self._stub.WaitForInstanceCompletion(req, timeout=grpc_timeout)
152156
state = new_orchestration_state(req.instanceId, res)
153157
if not state:
154158
return None

tests/test_client.py

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1-
from unittest.mock import patch, ANY
1+
from unittest.mock import patch, ANY, Mock
22

3+
from durabletask.client import TaskHubGrpcClient
34
from durabletask.internal.shared import (DefaultClientInterceptorImpl,
45
get_default_host_address,
56
get_grpc_channel)
7+
import pytest
68

79
HOST_ADDRESS = 'localhost:50051'
810
METADATA = [('key1', 'value1'), ('key2', 'value2')]
@@ -85,4 +87,61 @@ def test_grpc_channel_with_host_name_protocol_stripping():
8587

8688
prefix = ""
8789
get_grpc_channel(prefix + host_name, METADATA, True)
88-
mock_secure_channel.assert_called_with(host_name, ANY)
90+
mock_secure_channel.assert_called_with(host_name, ANY)
91+
92+
93+
@pytest.mark.parametrize("timeout", [None, 0, 5])
94+
def test_wait_for_orchestration_start_timeout(timeout):
95+
instance_id = "test-instance"
96+
97+
from durabletask.internal.orchestrator_service_pb2 import GetInstanceResponse, \
98+
OrchestrationState, ORCHESTRATION_STATUS_RUNNING
99+
100+
response = GetInstanceResponse()
101+
state = OrchestrationState()
102+
state.instanceId = instance_id
103+
state.orchestrationStatus = ORCHESTRATION_STATUS_RUNNING
104+
response.orchestrationState.CopyFrom(state)
105+
106+
c = TaskHubGrpcClient()
107+
c._stub = Mock()
108+
c._stub.WaitForInstanceStart.return_value = response
109+
110+
grpc_timeout = None if timeout is None else timeout
111+
c.wait_for_orchestration_start(instance_id, timeout=grpc_timeout)
112+
113+
# Verify WaitForInstanceStart was called with timeout=None
114+
c._stub.WaitForInstanceStart.assert_called_once()
115+
_, kwargs = c._stub.WaitForInstanceStart.call_args
116+
if timeout is None or timeout == 0:
117+
assert kwargs.get('timeout') is None
118+
else:
119+
assert kwargs.get('timeout') == timeout
120+
121+
@pytest.mark.parametrize("timeout", [None, 0, 5])
122+
def test_wait_for_orchestration_completion_timeout(timeout):
123+
instance_id = "test-instance"
124+
125+
from durabletask.internal.orchestrator_service_pb2 import GetInstanceResponse, \
126+
OrchestrationState, ORCHESTRATION_STATUS_COMPLETED
127+
128+
response = GetInstanceResponse()
129+
state = OrchestrationState()
130+
state.instanceId = instance_id
131+
state.orchestrationStatus = ORCHESTRATION_STATUS_COMPLETED
132+
response.orchestrationState.CopyFrom(state)
133+
134+
c = TaskHubGrpcClient()
135+
c._stub = Mock()
136+
c._stub.WaitForInstanceCompletion.return_value = response
137+
138+
grpc_timeout = None if timeout is None else timeout
139+
c.wait_for_orchestration_completion(instance_id, timeout=grpc_timeout)
140+
141+
# Verify WaitForInstanceStart was called with timeout=None
142+
c._stub.WaitForInstanceCompletion.assert_called_once()
143+
_, kwargs = c._stub.WaitForInstanceCompletion.call_args
144+
if timeout is None or timeout == 0:
145+
assert kwargs.get('timeout') is None
146+
else:
147+
assert kwargs.get('timeout') == timeout

0 commit comments

Comments
 (0)