Skip to content

Commit 6fa6cfa

Browse files
authored
Merge branch 'main' into andystaples/minimize-entity-state-exposure
2 parents da6b55a + 5b453ed commit 6fa6cfa

File tree

11 files changed

+124
-22
lines changed

11 files changed

+124
-22
lines changed

.github/workflows/durabletask-azuremanaged.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ jobs:
1515
runs-on: ubuntu-latest
1616
steps:
1717
- uses: actions/checkout@v4
18-
- name: Set up Python 3.13
18+
- name: Set up Python 3.14
1919
uses: actions/setup-python@v5
2020
with:
21-
python-version: 3.13
21+
python-version: 3.14
2222
- name: Install dependencies
2323
working-directory: durabletask-azuremanaged
2424
run: |
@@ -36,7 +36,7 @@ jobs:
3636
strategy:
3737
fail-fast: false
3838
matrix:
39-
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
39+
python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"]
4040
env:
4141
EMULATOR_VERSION: "latest"
4242
needs: lint
@@ -100,7 +100,7 @@ jobs:
100100
- name: Set up Python
101101
uses: actions/setup-python@v5
102102
with:
103-
python-version: "3.13" # Adjust Python version as needed
103+
python-version: "3.14" # Adjust Python version as needed
104104

105105
- name: Install dependencies
106106
run: |

.github/workflows/durabletask.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ jobs:
1515
runs-on: ubuntu-latest
1616
steps:
1717
- uses: actions/checkout@v4
18-
- name: Set up Python 3.13
18+
- name: Set up Python 3.14
1919
uses: actions/setup-python@v5
2020
with:
21-
python-version: 3.13
21+
python-version: 3.14
2222
- name: Install dependencies
2323
run: |
2424
python -m pip install --upgrade pip
@@ -38,7 +38,7 @@ jobs:
3838
strategy:
3939
fail-fast: false
4040
matrix:
41-
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
41+
python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"]
4242
needs: lint-and-unit-tests
4343
runs-on: ubuntu-latest
4444
steps:
@@ -85,7 +85,7 @@ jobs:
8585
- name: Set up Python
8686
uses: actions/setup-python@v5
8787
with:
88-
python-version: "3.13" # Adjust Python version as needed
88+
python-version: "3.14" # Adjust Python version as needed
8989

9090
- name: Install dependencies
9191
run: |

docs/features.md

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,4 +148,21 @@ Orchestrations can be suspended using the `suspend_orchestration` client API and
148148

149149
### Retry policies
150150

151-
Orchestrations can specify retry policies for activities and sub-orchestrations. These policies control how many times and how frequently an activity or sub-orchestration will be retried in the event of a transient error.
151+
Orchestrations can specify retry policies for activities and sub-orchestrations. These policies control how many times and how frequently an activity or sub-orchestration will be retried in the event of a transient error.
152+
153+
### Logging configuration
154+
155+
Both the TaskHubGrpcWorker and TaskHubGrpcClient (as well as DurableTaskSchedulerWorker and DurableTaskSchedulerClient for durabletask-azuremanaged) accept a log_handler and log_formatter object from `logging`. These can be used to customize verbosity, output location, and format of logs emitted by these sources.
156+
157+
For example, to output logs to a file called `worker.log` at level `DEBUG`, the following syntax might apply:
158+
159+
```python
160+
log_handler = logging.FileHandler('durable.log', encoding='utf-8')
161+
log_handler.setLevel(logging.DEBUG)
162+
163+
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel,
164+
taskhub=taskhub_name, token_credential=credential, log_handler=log_handler) as w:
165+
```
166+
167+
**NOTE**
168+
The worker and client output many logs at the `DEBUG` level that will be useful when understanding orchestration flow and diagnosing issues with Durable applications. Before submitting issues, please attempt a repro of the issue with debug logging enabled.

durabletask-azuremanaged/durabletask/azuremanaged/client.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# Copyright (c) Microsoft Corporation.
22
# Licensed under the MIT License.
33

4+
import logging
5+
46
from typing import Optional
57

68
from azure.core.credentials import TokenCredential
@@ -18,7 +20,9 @@ def __init__(self, *,
1820
taskhub: str,
1921
token_credential: Optional[TokenCredential],
2022
secure_channel: bool = True,
21-
default_version: Optional[str] = None):
23+
default_version: Optional[str] = None,
24+
log_handler: Optional[logging.Handler] = None,
25+
log_formatter: Optional[logging.Formatter] = None):
2226

2327
if not taskhub:
2428
raise ValueError("Taskhub value cannot be empty. Please provide a value for your taskhub")
@@ -31,5 +35,7 @@ def __init__(self, *,
3135
host_address=host_address,
3236
secure_channel=secure_channel,
3337
metadata=None,
38+
log_handler=log_handler,
39+
log_formatter=log_formatter,
3440
interceptors=interceptors,
3541
default_version=default_version)

durabletask-azuremanaged/durabletask/azuremanaged/worker.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# Copyright (c) Microsoft Corporation.
22
# Licensed under the MIT License.
33

4+
import logging
5+
46
from typing import Optional
57

68
from azure.core.credentials import TokenCredential
@@ -28,6 +30,8 @@ class DurableTaskSchedulerWorker(TaskHubGrpcWorker):
2830
concurrency_options (Optional[ConcurrencyOptions], optional): Configuration
2931
for controlling worker concurrency limits. If None, default concurrency
3032
settings will be used.
33+
log_handler (Optional[logging.Handler], optional): Custom logging handler for worker logs.
34+
log_formatter (Optional[logging.Formatter], optional): Custom log formatter for worker logs.
3135
3236
Raises:
3337
ValueError: If taskhub is empty or None.
@@ -52,12 +56,15 @@ class DurableTaskSchedulerWorker(TaskHubGrpcWorker):
5256
parameter is set to None since authentication is handled by the
5357
DTS interceptor.
5458
"""
59+
5560
def __init__(self, *,
5661
host_address: str,
5762
taskhub: str,
5863
token_credential: Optional[TokenCredential],
5964
secure_channel: bool = True,
60-
concurrency_options: Optional[ConcurrencyOptions] = None):
65+
concurrency_options: Optional[ConcurrencyOptions] = None,
66+
log_handler: Optional[logging.Handler] = None,
67+
log_formatter: Optional[logging.Formatter] = None):
6168

6269
if not taskhub:
6370
raise ValueError("The taskhub value cannot be empty.")
@@ -70,5 +77,7 @@ def __init__(self, *,
7077
host_address=host_address,
7178
secure_channel=secure_channel,
7279
metadata=None,
80+
log_handler=log_handler,
81+
log_formatter=log_formatter,
7382
interceptors=interceptors,
7483
concurrency_options=concurrency_options)

durabletask-azuremanaged/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ classifiers = [
2222
"Programming Language :: Python :: 3",
2323
"License :: OSI Approved :: MIT License",
2424
]
25-
requires-python = ">=3.9"
25+
requires-python = ">=3.10"
2626
license = {file = "LICENSE"}
2727
readme = "README.md"
2828
dependencies = [

durabletask/task.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ def lock_entities(self, entities: list[EntityInstanceId]) -> Task[EntityLock]:
201201
pass
202202

203203
@abstractmethod
204-
def call_sub_orchestrator(self, orchestrator: Orchestrator[TInput, TOutput], *,
204+
def call_sub_orchestrator(self, orchestrator: Union[Orchestrator[TInput, TOutput], str], *,
205205
input: Optional[TInput] = None,
206206
instance_id: Optional[str] = None,
207207
retry_policy: Optional[RetryPolicy] = None,

durabletask/worker.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ class TaskHubGrpcWorker:
246246
Defaults to the value from environment variables or localhost.
247247
metadata (Optional[list[tuple[str, str]]], optional): gRPC metadata to include with
248248
requests. Used for authentication and routing. Defaults to None.
249-
log_handler (optional): Custom logging handler for worker logs. Defaults to None.
249+
log_handler (optional[logging.Handler]): Custom logging handler for worker logs. Defaults to None.
250250
log_formatter (Optional[logging.Formatter], optional): Custom log formatter.
251251
Defaults to None.
252252
secure_channel (bool, optional): Whether to use a secure gRPC channel (TLS).
@@ -314,7 +314,7 @@ def __init__(
314314
*,
315315
host_address: Optional[str] = None,
316316
metadata: Optional[list[tuple[str, str]]] = None,
317-
log_handler=None,
317+
log_handler: Optional[logging.Handler] = None,
318318
log_formatter: Optional[logging.Formatter] = None,
319319
secure_channel: bool = False,
320320
interceptors: Optional[Sequence[shared.ClientInterceptor]] = None,
@@ -1029,15 +1029,18 @@ def lock_entities(self, entities: list[EntityInstanceId]) -> task.Task[EntityLoc
10291029

10301030
def call_sub_orchestrator(
10311031
self,
1032-
orchestrator: task.Orchestrator[TInput, TOutput],
1032+
orchestrator: Union[task.Orchestrator[TInput, TOutput], str],
10331033
*,
10341034
input: Optional[TInput] = None,
10351035
instance_id: Optional[str] = None,
10361036
retry_policy: Optional[task.RetryPolicy] = None,
10371037
version: Optional[str] = None,
10381038
) -> task.Task[TOutput]:
10391039
id = self.next_sequence_number()
1040-
orchestrator_name = task.get_name(orchestrator)
1040+
if isinstance(orchestrator, str):
1041+
orchestrator_name = orchestrator
1042+
else:
1043+
orchestrator_name = task.get_name(orchestrator)
10411044
default_version = self._registry.versioning.default_version if self._registry.versioning else None
10421045
orchestrator_version = version if version else default_version
10431046
self.call_activity_function_helper(
@@ -1232,6 +1235,15 @@ def execute(
12321235
old_events: Sequence[pb.HistoryEvent],
12331236
new_events: Sequence[pb.HistoryEvent],
12341237
) -> ExecutionResults:
1238+
orchestration_name = "<unknown>"
1239+
orchestration_started_events = [e for e in old_events if e.HasField("executionStarted")]
1240+
if len(orchestration_started_events) >= 1:
1241+
orchestration_name = orchestration_started_events[0].executionStarted.name
1242+
1243+
self._logger.debug(
1244+
f"{instance_id}: Beginning replay for orchestrator {orchestration_name}..."
1245+
)
1246+
12351247
if not new_events:
12361248
raise task.OrchestrationStateError(
12371249
"The new history event list must have at least one event in it."
@@ -1268,13 +1280,15 @@ def execute(
12681280

12691281
except Exception as ex:
12701282
# Unhandled exceptions fail the orchestration
1283+
self._logger.debug(f"{instance_id}: Orchestration {orchestration_name} failed")
12711284
ctx.set_failed(ex)
12721285

12731286
if not ctx._is_complete:
12741287
task_count = len(ctx._pending_tasks)
12751288
event_count = len(ctx._pending_events)
12761289
self._logger.info(
1277-
f"{instance_id}: Orchestrator yielded with {task_count} task(s) and {event_count} event(s) outstanding."
1290+
f"{instance_id}: Orchestrator {orchestration_name} yielded with {task_count} task(s) "
1291+
f"and {event_count} event(s) outstanding."
12781292
)
12791293
elif (
12801294
ctx._completion_status and ctx._completion_status is not pb.ORCHESTRATION_STATUS_CONTINUED_AS_NEW
@@ -1283,7 +1297,7 @@ def execute(
12831297
ctx._completion_status
12841298
)
12851299
self._logger.info(
1286-
f"{instance_id}: Orchestration completed with status: {completion_status_str}"
1300+
f"{instance_id}: Orchestration {orchestration_name} completed with status: {completion_status_str}"
12871301
)
12881302

12891303
actions = ctx.get_actions()

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ classifiers = [
2121
"Programming Language :: Python :: 3",
2222
"License :: OSI Approved :: MIT License",
2323
]
24-
requires-python = ">=3.9"
24+
requires-python = ">=3.10"
2525
license = {file = "LICENSE"}
2626
readme = "README.md"
2727
dependencies = [

tests/durabletask-azuremanaged/test_dts_orchestration_e2e.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,34 @@ def parent_orchestrator(ctx: task.OrchestrationContext, count: int):
175175
assert activity_counter == 30
176176

177177

178+
def test_sub_orchestrator_by_name():
179+
sub_orchestrator_counter = 0
180+
181+
def orchestrator_child(ctx: task.OrchestrationContext, _):
182+
nonlocal sub_orchestrator_counter
183+
sub_orchestrator_counter += 1
184+
185+
def parent_orchestrator(ctx: task.OrchestrationContext, _):
186+
yield ctx.call_sub_orchestrator("orchestrator_child")
187+
188+
# Start a worker, which will connect to the sidecar in a background thread
189+
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True,
190+
taskhub=taskhub_name, token_credential=None) as w:
191+
w.add_orchestrator(orchestrator_child)
192+
w.add_orchestrator(parent_orchestrator)
193+
w.start()
194+
195+
task_hub_client = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True,
196+
taskhub=taskhub_name, token_credential=None)
197+
id = task_hub_client.schedule_new_orchestration(parent_orchestrator, input=None)
198+
state = task_hub_client.wait_for_orchestration_completion(id, timeout=30)
199+
200+
assert state is not None
201+
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
202+
assert state.failure_details is None
203+
assert sub_orchestrator_counter == 1
204+
205+
178206
def test_wait_for_multiple_external_events():
179207
def orchestrator(ctx: task.OrchestrationContext, _):
180208
a = yield ctx.wait_for_external_event('A')
@@ -267,7 +295,7 @@ def orchestrator(ctx: task.OrchestrationContext, _):
267295
# try:
268296
# state = task_hub_client.wait_for_orchestration_completion(id, timeout=3)
269297
# assert False, "Orchestration should not have completed"
270-
# except TimeoutError:
298+
# except (TimeoutError, _InactiveRpcError):
271299
# pass
272300

273301
# # Resume the orchestration and wait for it to complete

0 commit comments

Comments
 (0)