Skip to content

Commit a13d4eb

Browse files
committed
Support scheduling Activities within Workflows
1 parent afef579 commit a13d4eb

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+2699
-2322
lines changed

cadence/_internal/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,3 @@
66
"""
77

88
__all__: list[str] = []
9-
10-

cadence/_internal/activity/_activity_executor.py

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,33 @@
88
from cadence._internal.activity._context import _Context, _SyncContext
99
from cadence.activity import ActivityInfo, ActivityDefinition, ExecutionStrategy
1010
from cadence.api.v1.common_pb2 import Failure
11-
from cadence.api.v1.service_worker_pb2 import PollForActivityTaskResponse, RespondActivityTaskFailedRequest, \
12-
RespondActivityTaskCompletedRequest
11+
from cadence.api.v1.service_worker_pb2 import (
12+
PollForActivityTaskResponse,
13+
RespondActivityTaskFailedRequest,
14+
RespondActivityTaskCompletedRequest,
15+
)
1316
from cadence.client import Client
1417

1518
_logger = getLogger(__name__)
1619

20+
1721
class ActivityExecutor:
18-
def __init__(self, client: Client, task_list: str, identity: str, max_workers: int, registry: Callable[[str], ActivityDefinition]):
22+
def __init__(
23+
self,
24+
client: Client,
25+
task_list: str,
26+
identity: str,
27+
max_workers: int,
28+
registry: Callable[[str], ActivityDefinition],
29+
):
1930
self._client = client
2031
self._data_converter = client.data_converter
2132
self._registry = registry
2233
self._identity = identity
2334
self._task_list = task_list
24-
self._thread_pool = ThreadPoolExecutor(max_workers=max_workers,
25-
thread_name_prefix=f'{task_list}-activity-')
35+
self._thread_pool = ThreadPoolExecutor(
36+
max_workers=max_workers, thread_name_prefix=f"{task_list}-activity-"
37+
)
2638

2739
async def execute(self, task: PollForActivityTaskResponse):
2840
try:
@@ -46,27 +58,33 @@ def _create_context(self, task: PollForActivityTaskResponse) -> _Context:
4658
else:
4759
return _SyncContext(self._client, info, activity_def, self._thread_pool)
4860

49-
async def _report_failure(self, task: PollForActivityTaskResponse, error: Exception):
61+
async def _report_failure(
62+
self, task: PollForActivityTaskResponse, error: Exception
63+
):
5064
try:
51-
await self._client.worker_stub.RespondActivityTaskFailed(RespondActivityTaskFailedRequest(
52-
task_token=task.task_token,
53-
failure=_to_failure(error),
54-
identity=self._identity,
55-
))
65+
await self._client.worker_stub.RespondActivityTaskFailed(
66+
RespondActivityTaskFailedRequest(
67+
task_token=task.task_token,
68+
failure=_to_failure(error),
69+
identity=self._identity,
70+
)
71+
)
5672
except Exception:
57-
_logger.exception('Exception reporting activity failure')
73+
_logger.exception("Exception reporting activity failure")
5874

5975
async def _report_success(self, task: PollForActivityTaskResponse, result: Any):
60-
as_payload = await self._data_converter.to_data([result])
76+
as_payload = self._data_converter.to_data([result])
6177

6278
try:
63-
await self._client.worker_stub.RespondActivityTaskCompleted(RespondActivityTaskCompletedRequest(
64-
task_token=task.task_token,
65-
result=as_payload,
66-
identity=self._identity,
67-
))
79+
await self._client.worker_stub.RespondActivityTaskCompleted(
80+
RespondActivityTaskCompletedRequest(
81+
task_token=task.task_token,
82+
result=as_payload,
83+
identity=self._identity,
84+
)
85+
)
6886
except Exception:
69-
_logger.exception('Exception reporting activity complete')
87+
_logger.exception("Exception reporting activity complete")
7088

7189
def _create_info(self, task: PollForActivityTaskResponse) -> ActivityInfo:
7290
return ActivityInfo(

cadence/_internal/activity/_context.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,27 +14,28 @@ def __init__(self, client: Client, info: ActivityInfo, activity_fn: ActivityDefi
1414
self._activity_fn = activity_fn
1515

1616
async def execute(self, payload: Payload) -> Any:
17-
params = await self._to_params(payload)
17+
params = self._to_params(payload)
1818
with self._activate():
1919
return await self._activity_fn(*params)
2020

21-
async def _to_params(self, payload: Payload) -> list[Any]:
21+
def _to_params(self, payload: Payload) -> list[Any]:
2222
type_hints = [param.type_hint for param in self._activity_fn.params]
23-
return await self._client.data_converter.from_data(payload, type_hints)
23+
return self._client.data_converter.from_data(payload, type_hints)
2424

2525
def client(self) -> Client:
2626
return self._client
2727

2828
def info(self) -> ActivityInfo:
2929
return self._info
3030

31+
3132
class _SyncContext(_Context):
3233
def __init__(self, client: Client, info: ActivityInfo, activity_fn: ActivityDefinition[[Any], Any], executor: ThreadPoolExecutor):
3334
super().__init__(client, info, activity_fn)
3435
self._executor = executor
3536

3637
async def execute(self, payload: Payload) -> Any:
37-
params = await self._to_params(payload)
38+
params = self._to_params(payload)
3839
loop = asyncio.get_running_loop()
3940
return await loop.run_in_executor(self._executor, self._run, params)
4041

@@ -44,4 +45,3 @@ def _run(self, args: list[Any]) -> Any:
4445

4546
def client(self) -> Client:
4647
raise RuntimeError("client is only supported in async activities")
47-

0 commit comments

Comments
 (0)