Skip to content

Commit f5fb2ff

Browse files
authored
Add history retrieval and terminal instance ID paging APIs (#133)
* Add history retrieval and terminal instance ID paging APIs
1 parent e613e57 commit f5fb2ff

File tree

9 files changed

+987
-5
lines changed

9 files changed

+987
-5
lines changed

.github/copilot-instructions.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,19 @@ building durable orchestrations. The repo contains two packages:
1515
- Update `CHANGELOG.md` for core SDK changes and
1616
`durabletask-azuremanaged/CHANGELOG.md` for provider changes.
1717
- If a change affects both packages, update both changelogs.
18+
- Include changelog entries for externally observable outcomes only, such as
19+
new public APIs, behavior changes, bug fixes users can notice, breaking
20+
changes, and new configuration capabilities.
1821
- Do NOT document internal-only changes in changelogs, including CI/workflow
1922
updates, test-only changes, refactors with no user-visible behavior change,
2023
and implementation details that do not affect public behavior or API.
24+
- When in doubt, write the changelog entry in terms of user impact (what users
25+
can now do or what behavior changed), not implementation mechanism (how it
26+
was implemented internally).
27+
28+
Examples:
29+
- Include: "Added `get_orchestration_history()` to retrieve orchestration history from the client."
30+
- Exclude: "Added internal helper functions to aggregate streamed history chunks."
2131

2232
## Language and Style
2333

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## Unreleased
99

10+
ADDED
11+
12+
- Added `get_orchestration_history()` and `list_instance_ids()` to the sync and async gRPC clients.
13+
- Added in-memory backend support for `StreamInstanceHistory` and `ListInstanceIds` so local orchestration tests can retrieve history and page terminal instance IDs by completion window.
14+
1015
## v1.4.0
1116

1217
ADDED

durabletask/client.py

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,16 @@
66
from dataclasses import dataclass
77
from datetime import datetime
88
from enum import Enum
9-
from typing import Any, List, Optional, Sequence, TypeVar, Union
9+
from typing import Any, Generic, List, Optional, Sequence, TypeVar, Union
1010

1111
import grpc
1212
import grpc.aio
1313

14+
import durabletask.history as history
1415
from durabletask.entities import EntityInstanceId
1516
from durabletask.entities.entity_metadata import EntityMetadata
1617
import durabletask.internal.helpers as helpers
18+
import durabletask.internal.history_helpers as history_helpers
1719
import durabletask.internal.orchestrator_service_pb2 as pb
1820
import durabletask.internal.orchestrator_service_pb2_grpc as stubs
1921
import durabletask.internal.shared as shared
@@ -37,6 +39,7 @@
3739

3840
TInput = TypeVar('TInput')
3941
TOutput = TypeVar('TOutput')
42+
TItem = TypeVar('TItem')
4043

4144

4245
class OrchestrationStatus(Enum):
@@ -99,6 +102,12 @@ class PurgeInstancesResult:
99102
is_complete: bool
100103

101104

105+
@dataclass
106+
class Page(Generic[TItem]):
107+
items: List[TItem]
108+
continuation_token: Optional[str]
109+
110+
102111
@dataclass
103112
class CleanEntityStorageResult:
104113
empty_entities_removed: int
@@ -218,6 +227,44 @@ def get_orchestration_state(self, instance_id: str, *, fetch_payloads: bool = Tr
218227
payload_helpers.deexternalize_payloads(res, self._payload_store)
219228
return new_orchestration_state(req.instanceId, res)
220229

230+
def get_orchestration_history(self,
231+
instance_id: str, *,
232+
execution_id: Optional[str] = None,
233+
for_work_item_processing: bool = False) -> List[history.HistoryEvent]:
234+
req = pb.StreamInstanceHistoryRequest(
235+
instanceId=instance_id,
236+
executionId=helpers.get_string_value(execution_id),
237+
forWorkItemProcessing=for_work_item_processing,
238+
)
239+
self._logger.info(f"Retrieving history for instance '{instance_id}'.")
240+
stream = self._stub.StreamInstanceHistory(req)
241+
return history_helpers.collect_history_events(stream, self._payload_store)
242+
243+
def list_instance_ids(self,
244+
runtime_status: Optional[List[OrchestrationStatus]] = None,
245+
completed_time_from: Optional[datetime] = None,
246+
completed_time_to: Optional[datetime] = None,
247+
page_size: Optional[int] = None,
248+
continuation_token: Optional[str] = None) -> Page[str]:
249+
req = pb.ListInstanceIdsRequest(
250+
runtimeStatus=[status.value for status in runtime_status] if runtime_status else [],
251+
completedTimeFrom=helpers.new_timestamp(completed_time_from) if completed_time_from else None,
252+
completedTimeTo=helpers.new_timestamp(completed_time_to) if completed_time_to else None,
253+
pageSize=page_size or 0,
254+
lastInstanceKey=helpers.get_string_value(continuation_token),
255+
)
256+
self._logger.info(
257+
"Listing terminal instance IDs with filters: "
258+
f"runtime_status={[str(status) for status in runtime_status] if runtime_status else None}, "
259+
f"completed_time_from={completed_time_from}, "
260+
f"completed_time_to={completed_time_to}, "
261+
f"page_size={page_size}, "
262+
f"continuation_token={continuation_token}"
263+
)
264+
resp: pb.ListInstanceIdsResponse = self._stub.ListInstanceIds(req)
265+
next_token = resp.lastInstanceKey.value if resp.HasField("lastInstanceKey") else None
266+
return Page(items=list(resp.instanceIds), continuation_token=next_token)
267+
221268
def get_all_orchestration_states(self,
222269
orchestration_query: Optional[OrchestrationQuery] = None
223270
) -> List[OrchestrationState]:
@@ -502,6 +549,44 @@ async def get_orchestration_state(self, instance_id: str, *,
502549
await payload_helpers.deexternalize_payloads_async(res, self._payload_store)
503550
return new_orchestration_state(req.instanceId, res)
504551

552+
async def get_orchestration_history(self,
553+
instance_id: str, *,
554+
execution_id: Optional[str] = None,
555+
for_work_item_processing: bool = False) -> List[history.HistoryEvent]:
556+
req = pb.StreamInstanceHistoryRequest(
557+
instanceId=instance_id,
558+
executionId=helpers.get_string_value(execution_id),
559+
forWorkItemProcessing=for_work_item_processing,
560+
)
561+
self._logger.info(f"Retrieving history for instance '{instance_id}'.")
562+
stream = self._stub.StreamInstanceHistory(req)
563+
return await history_helpers.collect_history_events_async(stream, self._payload_store)
564+
565+
async def list_instance_ids(self,
566+
runtime_status: Optional[List[OrchestrationStatus]] = None,
567+
completed_time_from: Optional[datetime] = None,
568+
completed_time_to: Optional[datetime] = None,
569+
page_size: Optional[int] = None,
570+
continuation_token: Optional[str] = None) -> Page[str]:
571+
req = pb.ListInstanceIdsRequest(
572+
runtimeStatus=[status.value for status in runtime_status] if runtime_status else [],
573+
completedTimeFrom=helpers.new_timestamp(completed_time_from) if completed_time_from else None,
574+
completedTimeTo=helpers.new_timestamp(completed_time_to) if completed_time_to else None,
575+
pageSize=page_size or 0,
576+
lastInstanceKey=helpers.get_string_value(continuation_token),
577+
)
578+
self._logger.info(
579+
"Listing terminal instance IDs with filters: "
580+
f"runtime_status={[str(status) for status in runtime_status] if runtime_status else None}, "
581+
f"completed_time_from={completed_time_from}, "
582+
f"completed_time_to={completed_time_to}, "
583+
f"page_size={page_size}, "
584+
f"continuation_token={continuation_token}"
585+
)
586+
resp: pb.ListInstanceIdsResponse = await self._stub.ListInstanceIds(req)
587+
next_token = resp.lastInstanceKey.value if resp.HasField("lastInstanceKey") else None
588+
return Page(items=list(resp.instanceIds), continuation_token=next_token)
589+
505590
async def get_all_orchestration_states(self,
506591
orchestration_query: Optional[OrchestrationQuery] = None
507592
) -> List[OrchestrationState]:

0 commit comments

Comments
 (0)