Skip to content

Commit 25aae7d

Browse files
committed
Merge remote-tracking branch 'upstream-ms/main' into update-upstream
Signed-off-by: Albert Callarisa <[email protected]>
2 parents 4602067 + dfec5da commit 25aae7d

22 files changed

+1399
-576
lines changed

CHANGELOG.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,23 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8-
## v0.2.0 (Unreleased)
8+
## v0.3.0
9+
10+
### New
11+
12+
- Added `ConcurrencyOptions` class for fine-grained concurrency control with separate limits for activities and orchestrations. The thread pool worker count can also be configured.
13+
14+
### Fixed
15+
16+
- Fixed an issue where a worker could not recover after its connection was interrupted or severed
17+
18+
## v0.2.1
919

1020
### New
1121

1222
- Added `set_custom_status` orchestrator API ([#31](https://github.com/microsoft/durabletask-python/pull/31)) - contributed by [@famarting](https://github.com/famarting)
1323
- Added `purge_orchestration` client API ([#34](https://github.com/microsoft/durabletask-python/pull/34)) - contributed by [@famarting](https://github.com/famarting)
24+
- Added new `durabletask-azuremanaged` package for use with the [Durable Task Scheduler](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) - by [@RyanLettieri](https://github.com/RyanLettieri)
1425

1526
### Changes
1627

dev-requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
grpcio-tools==1.62.3 # 1.62.X is the latest version before protobuf 1.26.X is used which has breaking changes for Python
1+
grpcio-tools

durabletask/client.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from dataclasses import dataclass
77
from datetime import datetime
88
from enum import Enum
9-
from typing import Any, Optional, TypeVar, Union
9+
from typing import Any, Optional, Sequence, TypeVar, Union
1010

1111
import grpc
1212
from google.protobuf import wrappers_pb2
@@ -16,6 +16,7 @@
1616
import durabletask.internal.orchestrator_service_pb2_grpc as stubs
1717
import durabletask.internal.shared as shared
1818
from durabletask import task
19+
from durabletask.internal.grpc_interceptor import DefaultClientInterceptorImpl
1920

2021
TInput = TypeVar('TInput')
2122
TOutput = TypeVar('TOutput')
@@ -96,8 +97,25 @@ def __init__(self, *,
9697
metadata: Optional[list[tuple[str, str]]] = None,
9798
log_handler: Optional[logging.Handler] = None,
9899
log_formatter: Optional[logging.Formatter] = None,
99-
secure_channel: bool = False):
100-
channel = shared.get_grpc_channel(host_address, metadata, secure_channel=secure_channel)
100+
secure_channel: bool = False,
101+
interceptors: Optional[Sequence[shared.ClientInterceptor]] = None):
102+
103+
# If the caller provided metadata, we need to create a new interceptor for it and
104+
# add it to the list of interceptors.
105+
if interceptors is not None:
106+
interceptors = list(interceptors)
107+
if metadata is not None:
108+
interceptors.append(DefaultClientInterceptorImpl(metadata))
109+
elif metadata is not None:
110+
interceptors = [DefaultClientInterceptorImpl(metadata)]
111+
else:
112+
interceptors = None
113+
114+
channel = shared.get_grpc_channel(
115+
host_address=host_address,
116+
secure_channel=secure_channel,
117+
interceptors=interceptors
118+
)
101119
self._stub = stubs.TaskHubSidecarServiceStub(channel)
102120
self._logger = shared.get_logger("client", log_handler, log_formatter)
103121

@@ -116,7 +134,7 @@ def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInpu
116134
scheduledStartTimestamp=helpers.new_timestamp(start_at) if start_at else None,
117135
version=wrappers_pb2.StringValue(value=""),
118136
orchestrationIdReusePolicy=reuse_id_policy,
119-
)
137+
)
120138

121139
self._logger.info(f"Starting new '{name}' instance with ID = '{req.instanceId}'.")
122140
res: pb.CreateInstanceResponse = self._stub.StartInstance(req)

durabletask/internal/grpc_interceptor.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,28 +19,28 @@ class _ClientCallDetails(
1919

2020

2121
class DefaultClientInterceptorImpl (
22-
grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor,
23-
grpc.StreamUnaryClientInterceptor, grpc.StreamStreamClientInterceptor):
22+
grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor,
23+
grpc.StreamUnaryClientInterceptor, grpc.StreamStreamClientInterceptor):
2424
"""The class implements a UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor,
25-
StreamUnaryClientInterceptor and StreamStreamClientInterceptor from grpc to add an
25+
StreamUnaryClientInterceptor and StreamStreamClientInterceptor from grpc to add an
2626
interceptor to add additional headers to all calls as needed."""
2727

2828
def __init__(self, metadata: list[tuple[str, str]]):
2929
super().__init__()
3030
self._metadata = metadata
3131

3232
def _intercept_call(
33-
self, client_call_details: _ClientCallDetails) -> grpc.ClientCallDetails:
33+
self, client_call_details: _ClientCallDetails) -> grpc.ClientCallDetails:
3434
"""Internal intercept_call implementation which adds metadata to grpc metadata in the RPC
3535
call details."""
3636
if self._metadata is None:
3737
return client_call_details
38-
38+
3939
if client_call_details.metadata is not None:
4040
metadata = list(client_call_details.metadata)
4141
else:
4242
metadata = []
43-
43+
4444
metadata.extend(self._metadata)
4545
client_call_details = _ClientCallDetails(
4646
client_call_details.method, client_call_details.timeout, metadata,

durabletask/internal/shared.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,16 @@
55
import json
66
import logging
77
from types import SimpleNamespace
8-
from typing import Any, Optional
8+
from typing import Any, Optional, Sequence, Union
99

1010
import grpc
1111

12-
from durabletask.internal.grpc_interceptor import DefaultClientInterceptorImpl
12+
ClientInterceptor = Union[
13+
grpc.UnaryUnaryClientInterceptor,
14+
grpc.UnaryStreamClientInterceptor,
15+
grpc.StreamUnaryClientInterceptor,
16+
grpc.StreamStreamClientInterceptor
17+
]
1318

1419
# Field name used to indicate that an object was automatically serialized
1520
# and should be deserialized as a SimpleNamespace
@@ -25,8 +30,8 @@ def get_default_host_address() -> str:
2530

2631
def get_grpc_channel(
2732
host_address: Optional[str],
28-
metadata: Optional[list[tuple[str, str]]],
29-
secure_channel: bool = False) -> grpc.Channel:
33+
secure_channel: bool = False,
34+
interceptors: Optional[Sequence[ClientInterceptor]] = None) -> grpc.Channel:
3035
if host_address is None:
3136
host_address = get_default_host_address()
3237

@@ -44,16 +49,18 @@ def get_grpc_channel(
4449
host_address = host_address[len(protocol):]
4550
break
4651

52+
# Create the base channel
4753
if secure_channel:
4854
channel = grpc.secure_channel(host_address, grpc.ssl_channel_credentials())
4955
else:
5056
channel = grpc.insecure_channel(host_address)
5157

52-
if metadata is not None and len(metadata) > 0:
53-
interceptors = [DefaultClientInterceptorImpl(metadata)]
58+
# Apply interceptors ONLY if they exist
59+
if interceptors:
5460
channel = grpc.intercept_channel(channel, *interceptors)
5561
return channel
5662

63+
5764
def get_logger(
5865
name_suffix: str,
5966
log_handler: Optional[logging.Handler] = None,
@@ -98,7 +105,7 @@ def default(self, obj):
98105
if dataclasses.is_dataclass(obj):
99106
# Dataclasses are not serializable by default, so we convert them to a dict and mark them for
100107
# automatic deserialization by the receiver
101-
d = dataclasses.asdict(obj) # type: ignore
108+
d = dataclasses.asdict(obj) # type: ignore
102109
d[AUTO_SERIALIZED] = True
103110
return d
104111
elif isinstance(obj, SimpleNamespace):

durabletask/task.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ class RetryableTask(CompletableTask[T]):
333333
"""A task that can be retried according to a retry policy."""
334334

335335
def __init__(self, retry_policy: RetryPolicy, action: pb.OrchestratorAction,
336-
start_time:datetime, is_sub_orch: bool) -> None:
336+
start_time: datetime, is_sub_orch: bool) -> None:
337337
super().__init__()
338338
self._action = action
339339
self._retry_policy = retry_policy
@@ -343,15 +343,15 @@ def __init__(self, retry_policy: RetryPolicy, action: pb.OrchestratorAction,
343343

344344
def increment_attempt_count(self) -> None:
345345
self._attempt_count += 1
346-
346+
347347
def compute_next_delay(self) -> Optional[timedelta]:
348348
if self._attempt_count >= self._retry_policy.max_number_of_attempts:
349349
return None
350350

351351
retry_expiration: datetime = datetime.max
352352
if self._retry_policy.retry_timeout is not None and self._retry_policy.retry_timeout != datetime.max:
353353
retry_expiration = self._start_time + self._retry_policy.retry_timeout
354-
354+
355355
if self._retry_policy.backoff_coefficient is None:
356356
backoff_coefficient = 1.0
357357
else:

0 commit comments

Comments
 (0)