|
6 | 6 | from dataclasses import dataclass |
7 | 7 | from datetime import datetime |
8 | 8 | from enum import Enum |
9 | | -from typing import Any, List, Optional, Sequence, TypeVar, Union |
| 9 | +from typing import Any, Generic, List, Optional, Sequence, TypeVar, Union |
10 | 10 |
|
11 | 11 | import grpc |
12 | 12 | import grpc.aio |
13 | 13 |
|
| 14 | +import durabletask.history as history |
14 | 15 | from durabletask.entities import EntityInstanceId |
15 | 16 | from durabletask.entities.entity_metadata import EntityMetadata |
16 | 17 | import durabletask.internal.helpers as helpers |
| 18 | +import durabletask.internal.history_helpers as history_helpers |
17 | 19 | import durabletask.internal.orchestrator_service_pb2 as pb |
18 | 20 | import durabletask.internal.orchestrator_service_pb2_grpc as stubs |
19 | 21 | import durabletask.internal.shared as shared |
|
37 | 39 |
|
38 | 40 | TInput = TypeVar('TInput') |
39 | 41 | TOutput = TypeVar('TOutput') |
| 42 | +TItem = TypeVar('TItem') |
40 | 43 |
|
41 | 44 |
|
42 | 45 | class OrchestrationStatus(Enum): |
@@ -99,6 +102,12 @@ class PurgeInstancesResult: |
99 | 102 | is_complete: bool |
100 | 103 |
|
101 | 104 |
|
| 105 | +@dataclass |
| 106 | +class Page(Generic[TItem]): |
| 107 | + items: List[TItem] |
| 108 | + continuation_token: Optional[str] |
| 109 | + |
| 110 | + |
102 | 111 | @dataclass |
103 | 112 | class CleanEntityStorageResult: |
104 | 113 | empty_entities_removed: int |
@@ -218,6 +227,44 @@ def get_orchestration_state(self, instance_id: str, *, fetch_payloads: bool = Tr |
218 | 227 | payload_helpers.deexternalize_payloads(res, self._payload_store) |
219 | 228 | return new_orchestration_state(req.instanceId, res) |
220 | 229 |
|
| 230 | + def get_orchestration_history(self, |
| 231 | + instance_id: str, *, |
| 232 | + execution_id: Optional[str] = None, |
| 233 | + for_work_item_processing: bool = False) -> List[history.HistoryEvent]: |
| 234 | + req = pb.StreamInstanceHistoryRequest( |
| 235 | + instanceId=instance_id, |
| 236 | + executionId=helpers.get_string_value(execution_id), |
| 237 | + forWorkItemProcessing=for_work_item_processing, |
| 238 | + ) |
| 239 | + self._logger.info(f"Retrieving history for instance '{instance_id}'.") |
| 240 | + stream = self._stub.StreamInstanceHistory(req) |
| 241 | + return history_helpers.collect_history_events(stream, self._payload_store) |
| 242 | + |
| 243 | + def list_instance_ids(self, |
| 244 | + runtime_status: Optional[List[OrchestrationStatus]] = None, |
| 245 | + completed_time_from: Optional[datetime] = None, |
| 246 | + completed_time_to: Optional[datetime] = None, |
| 247 | + page_size: Optional[int] = None, |
| 248 | + continuation_token: Optional[str] = None) -> Page[str]: |
| 249 | + req = pb.ListInstanceIdsRequest( |
| 250 | + runtimeStatus=[status.value for status in runtime_status] if runtime_status else None, |
| 251 | + completedTimeFrom=helpers.new_timestamp(completed_time_from) if completed_time_from else None, |
| 252 | + completedTimeTo=helpers.new_timestamp(completed_time_to) if completed_time_to else None, |
| 253 | + pageSize=page_size or 0, |
| 254 | + lastInstanceKey=helpers.get_string_value(continuation_token), |
| 255 | + ) |
| 256 | + self._logger.info( |
| 257 | + "Listing terminal instance IDs with filters: " |
| 258 | + f"runtime_status={[str(status) for status in runtime_status] if runtime_status else None}, " |
| 259 | + f"completed_time_from={completed_time_from}, " |
| 260 | + f"completed_time_to={completed_time_to}, " |
| 261 | + f"page_size={page_size}, " |
| 262 | + f"continuation_token={continuation_token}" |
| 263 | + ) |
| 264 | + resp: pb.ListInstanceIdsResponse = self._stub.ListInstanceIds(req) |
| 265 | + next_token = resp.lastInstanceKey.value if resp.HasField("lastInstanceKey") else None |
| 266 | + return Page(items=list(resp.instanceIds), continuation_token=next_token) |
| 267 | + |
221 | 268 | def get_all_orchestration_states(self, |
222 | 269 | orchestration_query: Optional[OrchestrationQuery] = None |
223 | 270 | ) -> List[OrchestrationState]: |
@@ -502,6 +549,44 @@ async def get_orchestration_state(self, instance_id: str, *, |
502 | 549 | await payload_helpers.deexternalize_payloads_async(res, self._payload_store) |
503 | 550 | return new_orchestration_state(req.instanceId, res) |
504 | 551 |
|
| 552 | + async def get_orchestration_history(self, |
| 553 | + instance_id: str, *, |
| 554 | + execution_id: Optional[str] = None, |
| 555 | + for_work_item_processing: bool = False) -> List[history.HistoryEvent]: |
| 556 | + req = pb.StreamInstanceHistoryRequest( |
| 557 | + instanceId=instance_id, |
| 558 | + executionId=helpers.get_string_value(execution_id), |
| 559 | + forWorkItemProcessing=for_work_item_processing, |
| 560 | + ) |
| 561 | + self._logger.info(f"Retrieving history for instance '{instance_id}'.") |
| 562 | + stream = self._stub.StreamInstanceHistory(req) |
| 563 | + return await history_helpers.collect_history_events_async(stream, self._payload_store) |
| 564 | + |
| 565 | + async def list_instance_ids(self, |
| 566 | + runtime_status: Optional[List[OrchestrationStatus]] = None, |
| 567 | + completed_time_from: Optional[datetime] = None, |
| 568 | + completed_time_to: Optional[datetime] = None, |
| 569 | + page_size: Optional[int] = None, |
| 570 | + continuation_token: Optional[str] = None) -> Page[str]: |
| 571 | + req = pb.ListInstanceIdsRequest( |
| 572 | + runtimeStatus=[status.value for status in runtime_status] if runtime_status else None, |
| 573 | + completedTimeFrom=helpers.new_timestamp(completed_time_from) if completed_time_from else None, |
| 574 | + completedTimeTo=helpers.new_timestamp(completed_time_to) if completed_time_to else None, |
| 575 | + pageSize=page_size or 0, |
| 576 | + lastInstanceKey=helpers.get_string_value(continuation_token), |
| 577 | + ) |
| 578 | + self._logger.info( |
| 579 | + "Listing terminal instance IDs with filters: " |
| 580 | + f"runtime_status={[str(status) for status in runtime_status] if runtime_status else None}, " |
| 581 | + f"completed_time_from={completed_time_from}, " |
| 582 | + f"completed_time_to={completed_time_to}, " |
| 583 | + f"page_size={page_size}, " |
| 584 | + f"continuation_token={continuation_token}" |
| 585 | + ) |
| 586 | + resp: pb.ListInstanceIdsResponse = await self._stub.ListInstanceIds(req) |
| 587 | + next_token = resp.lastInstanceKey.value if resp.HasField("lastInstanceKey") else None |
| 588 | + return Page(items=list(resp.instanceIds), continuation_token=next_token) |
| 589 | + |
505 | 590 | async def get_all_orchestration_states(self, |
506 | 591 | orchestration_query: Optional[OrchestrationQuery] = None |
507 | 592 | ) -> List[OrchestrationState]: |
|
0 commit comments