Skip to content

Commit 6a2ab15

Browse files
authored
Fix and improve connection handling, add concurrency options, prep for release (microsoft#50)
* Reconnect upon connection error * concurrency * Test updates * More updates * more concurrency stuff * final touches * fix import * update log level * fix exports * more fixup * test updateS * more test imports * fix github workflow pytest * cleanup tests * Python 3.9 specific test fix * fixup reconnection for new concurrency model * autopep8 * Remove existing duplicate import
1 parent 1459a55 commit 6a2ab15

File tree

12 files changed

+1241
-202
lines changed

12 files changed

+1241
-202
lines changed

.github/workflows/pr-validation.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@ jobs:
2525
uses: actions/setup-python@v5
2626
with:
2727
python-version: ${{ matrix.python-version }}
28-
- name: Install durabletask dependencies
28+
- name: Install durabletask dependencies and the library itself in editable mode
2929
run: |
3030
python -m pip install --upgrade pip
3131
pip install flake8 pytest
3232
pip install -r requirements.txt
33+
pip install -e .
3334
- name: Install durabletask-azuremanaged dependencies
3435
working-directory: examples/dts
3536
run: |

CHANGELOG.md

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,23 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8-
## v0.2.0 (Unreleased)
8+
## v0.3.0
9+
10+
### New
11+
12+
- Added `ConcurrencyOptions` class for fine-grained concurrency control with separate limits for activities and orchestrations. The thread pool worker count can also be configured.
13+
14+
### Fixed
15+
16+
- Fixed an issue where a worker could not recover after its connection was interrupted or severed
17+
18+
## v0.2.1
919

1020
### New
1121

1222
- Added `set_custom_status` orchestrator API ([#31](https://github.com/microsoft/durabletask-python/pull/31)) - contributed by [@famarting](https://github.com/famarting)
1323
- Added `purge_orchestration` client API ([#34](https://github.com/microsoft/durabletask-python/pull/34)) - contributed by [@famarting](https://github.com/famarting)
14-
- Added new `durabletask-azuremanaged` package for use with the [Durable Task Scheduler](https://techcommunity.microsoft.com/blog/appsonazureblog/announcing-limited-early-access-of-the-durable-task-scheduler-for-azure-durable-/4286526) - by [@RyanLettieri](https://github.com/RyanLettieri)
24+
- Added new `durabletask-azuremanaged` package for use with the [Durable Task Scheduler](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) - by [@RyanLettieri](https://github.com/RyanLettieri)
1525

1626
### Changes
1727

durabletask-azuremanaged/durabletask/azuremanaged/worker.py

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,59 @@
55

66
from azure.core.credentials import TokenCredential
77

8-
from durabletask.azuremanaged.internal.durabletask_grpc_interceptor import (
9-
DTSDefaultClientInterceptorImpl,
10-
)
11-
from durabletask.worker import TaskHubGrpcWorker
8+
from durabletask.azuremanaged.internal.durabletask_grpc_interceptor import \
9+
DTSDefaultClientInterceptorImpl
10+
from durabletask.worker import ConcurrencyOptions, TaskHubGrpcWorker
1211

1312

1413
# Worker class used for Durable Task Scheduler (DTS)
1514
class DurableTaskSchedulerWorker(TaskHubGrpcWorker):
15+
"""A worker implementation for Azure Durable Task Scheduler (DTS).
16+
17+
This class extends TaskHubGrpcWorker to provide integration with Azure's
18+
Durable Task Scheduler service. It handles authentication via Azure credentials
19+
and configures the necessary gRPC interceptors for DTS communication.
20+
21+
Args:
22+
host_address (str): The gRPC endpoint address of the DTS service.
23+
taskhub (str): The name of the task hub. Cannot be empty.
24+
token_credential (Optional[TokenCredential]): Azure credential for authentication.
25+
If None, anonymous authentication will be used.
26+
secure_channel (bool, optional): Whether to use a secure gRPC channel (TLS).
27+
Defaults to True.
28+
concurrency_options (Optional[ConcurrencyOptions], optional): Configuration
29+
for controlling worker concurrency limits. If None, default concurrency
30+
settings will be used.
31+
32+
Raises:
33+
ValueError: If taskhub is empty or None.
34+
35+
Example:
36+
>>> from azure.identity import DefaultAzureCredential
37+
>>> from durabletask.azuremanaged import DurableTaskSchedulerWorker
38+
>>> from durabletask.worker import ConcurrencyOptions
39+
>>>
40+
>>> credential = DefaultAzureCredential()
41+
>>> concurrency = ConcurrencyOptions(max_concurrent_activities=10)
42+
>>> worker = DurableTaskSchedulerWorker(
43+
... host_address="my-dts-service.azure.com:443",
44+
... taskhub="my-task-hub",
45+
... token_credential=credential,
46+
... concurrency_options=concurrency
47+
... )
48+
49+
Note:
50+
This worker automatically configures DTS-specific gRPC interceptors
51+
for authentication and task hub routing. The parent class metadata
52+
parameter is set to None since authentication is handled by the
53+
DTS interceptor.
54+
"""
1655
def __init__(self, *,
1756
host_address: str,
1857
taskhub: str,
1958
token_credential: Optional[TokenCredential],
20-
secure_channel: bool = True):
59+
secure_channel: bool = True,
60+
concurrency_options: Optional[ConcurrencyOptions] = None):
2161

2262
if not taskhub:
2363
raise ValueError("The taskhub value cannot be empty.")
@@ -30,4 +70,5 @@ def __init__(self, *,
3070
host_address=host_address,
3171
secure_channel=secure_channel,
3272
metadata=None,
33-
interceptors=interceptors)
73+
interceptors=interceptors,
74+
concurrency_options=concurrency_options)

durabletask-azuremanaged/pyproject.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ build-backend = "setuptools.build_meta"
99

1010
[project]
1111
name = "durabletask.azuremanaged"
12-
version = "0.1.4"
13-
description = "Extensions for the Durable Task Python SDK for integrating with the Durable Task Scheduler in Azure"
12+
version = "0.2.0"
13+
description = "Durable Task Python SDK provider implementation for the Azure Durable Task Scheduler"
1414
keywords = [
1515
"durable",
1616
"task",
@@ -26,7 +26,7 @@ requires-python = ">=3.9"
2626
license = {file = "LICENSE"}
2727
readme = "README.md"
2828
dependencies = [
29-
"durabletask>=0.2.1",
29+
"durabletask>=0.3.0",
3030
"azure-identity>=1.19.0"
3131
]
3232

durabletask/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,8 @@
33

44
"""Durable Task SDK for Python"""
55

6+
from durabletask.worker import ConcurrencyOptions
7+
8+
__all__ = ["ConcurrencyOptions"]
69

710
PACKAGE_NAME = "durabletask"

0 commit comments

Comments
 (0)