Skip to content

Update upstream #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
d53cf5c
Update version to 0.2b1, require Python 3.9+, and enhance GitHub Acti…
cgillum Jan 6, 2025
4a303cb
Downgrade required `grpcio` and `protobuf` versions (#36)
berndverst Jan 8, 2025
2466e7d
Remove protocol prefix from host name and auto-configure secure mode …
berndverst Jan 10, 2025
551cb02
Improve Proto Generation: Download proto file directly instead of via…
berndverst Jan 17, 2025
37544cf
remove gitmodule file (#41)
YunchuWang Jan 23, 2025
2bdf87f
Creation of DTS example and passing of completionToken (#40)
RyanLettieri Feb 18, 2025
6d3ad8f
Update pr-validation.yml
berndverst Mar 10, 2025
75f573b
Making token credential optional (#45)
RyanLettieri Mar 10, 2025
aae0267
Creation of pipeline to publish dts python package to pypi (#43)
RyanLettieri Mar 21, 2025
62d2014
Add missing protobuf dependency
berndverst Mar 26, 2025
04fe991
Add user agent (#49)
berndverst May 6, 2025
e6be3d6
Bump azuremanaged version for release
berndverst May 20, 2025
c9704b3
Fix and improve connection handling, add concurrency options, prep fo…
berndverst Jun 3, 2025
43a4453
Update GitHub workflows and automate release (#51)
berndverst Jun 3, 2025
dfec5da
Updates instructions for running e2e tests to match CI (#37)
elena-kolevska Jun 3, 2025
25aae7d
Merge remote-tracking branch 'upstream-ms/main' into update-upstream
acroca Jun 16, 2025
89437eb
Bring examples back
acroca Jun 18, 2025
3439afc
Remove misleading entry in changelog
acroca Jun 18, 2025
76033a2
Fixed examples readme with specific dapr instructions
acroca Jun 18, 2025
f9e2bf5
Use pinned grpcio-tools version
acroca Jun 30, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,17 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## v0.2.0 (Unreleased)
## v0.3.0

### New

- Added `ConcurrencyOptions` class for fine-grained concurrency control with separate limits for activities and orchestrations. The thread pool worker count can also be configured.

### Fixed

- Fixed an issue where a worker could not recover after its connection was interrupted or severed

## v0.2.1

### New

Expand Down
26 changes: 22 additions & 4 deletions durabletask/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Any, Optional, TypeVar, Union
from typing import Any, Optional, Sequence, TypeVar, Union

import grpc
from google.protobuf import wrappers_pb2
Expand All @@ -16,6 +16,7 @@
import durabletask.internal.orchestrator_service_pb2_grpc as stubs
import durabletask.internal.shared as shared
from durabletask import task
from durabletask.internal.grpc_interceptor import DefaultClientInterceptorImpl

TInput = TypeVar('TInput')
TOutput = TypeVar('TOutput')
Expand Down Expand Up @@ -96,8 +97,25 @@ def __init__(self, *,
metadata: Optional[list[tuple[str, str]]] = None,
log_handler: Optional[logging.Handler] = None,
log_formatter: Optional[logging.Formatter] = None,
secure_channel: bool = False):
channel = shared.get_grpc_channel(host_address, metadata, secure_channel=secure_channel)
secure_channel: bool = False,
interceptors: Optional[Sequence[shared.ClientInterceptor]] = None):

# If the caller provided metadata, we need to create a new interceptor for it and
# add it to the list of interceptors.
if interceptors is not None:
interceptors = list(interceptors)
if metadata is not None:
interceptors.append(DefaultClientInterceptorImpl(metadata))
elif metadata is not None:
interceptors = [DefaultClientInterceptorImpl(metadata)]
else:
interceptors = None

channel = shared.get_grpc_channel(
host_address=host_address,
secure_channel=secure_channel,
interceptors=interceptors
)
self._stub = stubs.TaskHubSidecarServiceStub(channel)
self._logger = shared.get_logger("client", log_handler, log_formatter)

Expand All @@ -116,7 +134,7 @@ def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInpu
scheduledStartTimestamp=helpers.new_timestamp(start_at) if start_at else None,
version=wrappers_pb2.StringValue(value=""),
orchestrationIdReusePolicy=reuse_id_policy,
)
)

self._logger.info(f"Starting new '{name}' instance with ID = '{req.instanceId}'.")
res: pb.CreateInstanceResponse = self._stub.StartInstance(req)
Expand Down
12 changes: 6 additions & 6 deletions durabletask/internal/grpc_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,28 @@ class _ClientCallDetails(


class DefaultClientInterceptorImpl (
grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor,
grpc.StreamUnaryClientInterceptor, grpc.StreamStreamClientInterceptor):
grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor,
grpc.StreamUnaryClientInterceptor, grpc.StreamStreamClientInterceptor):
"""The class implements a UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor,
StreamUnaryClientInterceptor and StreamStreamClientInterceptor from grpc to add an
StreamUnaryClientInterceptor and StreamStreamClientInterceptor from grpc to add an
interceptor to add additional headers to all calls as needed."""

def __init__(self, metadata: list[tuple[str, str]]):
super().__init__()
self._metadata = metadata

def _intercept_call(
self, client_call_details: _ClientCallDetails) -> grpc.ClientCallDetails:
self, client_call_details: _ClientCallDetails) -> grpc.ClientCallDetails:
"""Internal intercept_call implementation which adds metadata to grpc metadata in the RPC
call details."""
if self._metadata is None:
return client_call_details

if client_call_details.metadata is not None:
metadata = list(client_call_details.metadata)
else:
metadata = []

metadata.extend(self._metadata)
client_call_details = _ClientCallDetails(
client_call_details.method, client_call_details.timeout, metadata,
Expand Down
21 changes: 14 additions & 7 deletions durabletask/internal/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@
import json
import logging
from types import SimpleNamespace
from typing import Any, Optional
from typing import Any, Optional, Sequence, Union

import grpc

from durabletask.internal.grpc_interceptor import DefaultClientInterceptorImpl
ClientInterceptor = Union[
grpc.UnaryUnaryClientInterceptor,
grpc.UnaryStreamClientInterceptor,
grpc.StreamUnaryClientInterceptor,
grpc.StreamStreamClientInterceptor
]

# Field name used to indicate that an object was automatically serialized
# and should be deserialized as a SimpleNamespace
Expand All @@ -25,8 +30,8 @@ def get_default_host_address() -> str:

def get_grpc_channel(
host_address: Optional[str],
metadata: Optional[list[tuple[str, str]]],
secure_channel: bool = False) -> grpc.Channel:
secure_channel: bool = False,
interceptors: Optional[Sequence[ClientInterceptor]] = None) -> grpc.Channel:
if host_address is None:
host_address = get_default_host_address()

Expand All @@ -44,16 +49,18 @@ def get_grpc_channel(
host_address = host_address[len(protocol):]
break

# Create the base channel
if secure_channel:
channel = grpc.secure_channel(host_address, grpc.ssl_channel_credentials())
else:
channel = grpc.insecure_channel(host_address)

if metadata is not None and len(metadata) > 0:
interceptors = [DefaultClientInterceptorImpl(metadata)]
# Apply interceptors ONLY if they exist
if interceptors:
channel = grpc.intercept_channel(channel, *interceptors)
return channel


def get_logger(
name_suffix: str,
log_handler: Optional[logging.Handler] = None,
Expand Down Expand Up @@ -98,7 +105,7 @@ def default(self, obj):
if dataclasses.is_dataclass(obj):
# Dataclasses are not serializable by default, so we convert them to a dict and mark them for
# automatic deserialization by the receiver
d = dataclasses.asdict(obj) # type: ignore
d = dataclasses.asdict(obj) # type: ignore
d[AUTO_SERIALIZED] = True
return d
elif isinstance(obj, SimpleNamespace):
Expand Down
6 changes: 3 additions & 3 deletions durabletask/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ class RetryableTask(CompletableTask[T]):
"""A task that can be retried according to a retry policy."""

def __init__(self, retry_policy: RetryPolicy, action: pb.OrchestratorAction,
start_time:datetime, is_sub_orch: bool) -> None:
start_time: datetime, is_sub_orch: bool) -> None:
super().__init__()
self._action = action
self._retry_policy = retry_policy
Expand All @@ -343,15 +343,15 @@ def __init__(self, retry_policy: RetryPolicy, action: pb.OrchestratorAction,

def increment_attempt_count(self) -> None:
self._attempt_count += 1

def compute_next_delay(self) -> Optional[timedelta]:
if self._attempt_count >= self._retry_policy.max_number_of_attempts:
return None

retry_expiration: datetime = datetime.max
if self._retry_policy.retry_timeout is not None and self._retry_policy.retry_timeout != datetime.max:
retry_expiration = self._start_time + self._retry_policy.retry_timeout

if self._retry_policy.backoff_coefficient is None:
backoff_coefficient = 1.0
else:
Expand Down
Loading
Loading