Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Syncs with upstream (bumps min python version, regenerates protos with an older grpcio-tools version) #6

Merged
merged 2 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions .github/workflows/pr-validation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11"]
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]

steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v3
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
Expand All @@ -36,6 +36,17 @@ jobs:
- name: Pytest unit tests
run: |
pytest -m "not e2e" --verbose
# Sidecar for running e2e tests requires Go SDK
- name: Install Go SDK
uses: actions/setup-go@v5
with:
go-version: 'stable'
# Install and run the durabletask-go sidecar for running e2e tests
- name: Pytest e2e tests
run: |
go install github.com/microsoft/durabletask-go@main
durabletask-go --port 4001 &
pytest -m "e2e" --verbose
publish:
needs: build
if: startswith(github.ref, 'refs/tags/v')
Expand All @@ -59,4 +70,4 @@ jobs:
TWINE_PASSWORD: ${{ secrets.PYPI_UPLOAD_PASS }}
run: |
python -m build
twine upload dist/*
twine upload dist/*
5 changes: 3 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"editor.defaultFormatter": "ms-python.autopep8",
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.organizeImports": true,
"source.organizeImports": "explicit"
},
"editor.rulers": [
119
Expand All @@ -29,5 +29,6 @@
"coverage.xml",
"jacoco.xml",
"coverage.cobertura.xml"
]
],
"makefile.configureOnOpen": false
}
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added `set_custom_status` orchestrator API ([#31](https://github.com/microsoft/durabletask-python/pull/31)) - contributed by [@famarting](https://github.com/famarting)
- Added `purge_orchestration` client API ([#34](https://github.com/microsoft/durabletask-python/pull/34)) - contributed by [@famarting](https://github.com/famarting)

### Changes

- Protos are compiled with gRPC 1.62.3 / protobuf 3.25.X instead of the latest release. This ensures compatibility with a wider range of grpcio versions for better compatibility with other packages / libraries.

### Updates

- Updated `durabletask-protobuf` submodule reference to latest
Expand Down
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ install:
python3 -m pip install .

gen-proto:
# NOTE: There is currently a hand-edit that we make to the generated orchestrator_service_pb2.py file after it's generated to help resolve import problems.
python3 -m grpc_tools.protoc --proto_path=./submodules/durabletask-protobuf/protos --python_out=./durabletask/internal --pyi_out=./durabletask/internal --grpc_python_out=./durabletask/internal orchestrator_service.proto
cp ./submodules/durabletask-protobuf/protos/orchestrator_service.proto durabletask/internal/orchestrator_service.proto
python3 -m grpc_tools.protoc --proto_path=. --python_out=. --pyi_out=. --grpc_python_out=. ./durabletask/internal/orchestrator_service.proto
rm durabletask/internal/*.proto

.PHONY: init test-unit test-e2e gen-proto install
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ git submodule update --init
Once the submodule is available, the corresponding source code can be regenerated using the following command from the project root:

```sh
pip3 install -r dev-requirements.txt
make gen-proto
```

Expand Down
1 change: 1 addition & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
grpcio-tools==1.62.3 # 1.62.X is the latest version before protobuf 1.26.X is used which has breaking changes for Python
40 changes: 20 additions & 20 deletions durabletask/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Any, List, Tuple, TypeVar, Union
from typing import Any, Optional, TypeVar, Union

import grpc
from google.protobuf import wrappers_pb2
Expand Down Expand Up @@ -42,10 +42,10 @@ class OrchestrationState:
runtime_status: OrchestrationStatus
created_at: datetime
last_updated_at: datetime
serialized_input: Union[str, None]
serialized_output: Union[str, None]
serialized_custom_status: Union[str, None]
failure_details: Union[task.FailureDetails, None]
serialized_input: Optional[str]
serialized_output: Optional[str]
serialized_custom_status: Optional[str]
failure_details: Optional[task.FailureDetails]

def raise_if_failed(self):
if self.failure_details is not None:
Expand All @@ -64,7 +64,7 @@ def failure_details(self):
return self._failure_details


def new_orchestration_state(instance_id: str, res: pb.GetInstanceResponse) -> Union[OrchestrationState, None]:
def new_orchestration_state(instance_id: str, res: pb.GetInstanceResponse) -> Optional[OrchestrationState]:
if not res.exists:
return None

Expand Down Expand Up @@ -92,20 +92,20 @@ def new_orchestration_state(instance_id: str, res: pb.GetInstanceResponse) -> Un
class TaskHubGrpcClient:

def __init__(self, *,
host_address: Union[str, None] = None,
metadata: Union[List[Tuple[str, str]], None] = None,
log_handler = None,
log_formatter: Union[logging.Formatter, None] = None,
host_address: Optional[str] = None,
metadata: Optional[list[tuple[str, str]]] = None,
log_handler: Optional[logging.Handler] = None,
log_formatter: Optional[logging.Formatter] = None,
secure_channel: bool = False):
channel = shared.get_grpc_channel(host_address, metadata, secure_channel=secure_channel)
self._stub = stubs.TaskHubSidecarServiceStub(channel)
self._logger = shared.get_logger("client", log_handler, log_formatter)

def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInput, TOutput], str], *,
input: Union[TInput, None] = None,
instance_id: Union[str, None] = None,
start_at: Union[datetime, None] = None,
reuse_id_policy: Union[pb.OrchestrationIdReusePolicy, None] = None) -> str:
input: Optional[TInput] = None,
instance_id: Optional[str] = None,
start_at: Optional[datetime] = None,
reuse_id_policy: Optional[pb.OrchestrationIdReusePolicy] = None) -> str:

name = orchestrator if isinstance(orchestrator, str) else task.get_name(orchestrator)

Expand All @@ -122,14 +122,14 @@ def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInpu
res: pb.CreateInstanceResponse = self._stub.StartInstance(req)
return res.instanceId

def get_orchestration_state(self, instance_id: str, *, fetch_payloads: bool = True) -> Union[OrchestrationState, None]:
def get_orchestration_state(self, instance_id: str, *, fetch_payloads: bool = True) -> Optional[OrchestrationState]:
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
res: pb.GetInstanceResponse = self._stub.GetInstance(req)
return new_orchestration_state(req.instanceId, res)

def wait_for_orchestration_start(self, instance_id: str, *,
fetch_payloads: bool = False,
timeout: int = 60) -> Union[OrchestrationState, None]:
timeout: int = 60) -> Optional[OrchestrationState]:
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
try:
self._logger.info(f"Waiting up to {timeout}s for instance '{instance_id}' to start.")
Expand All @@ -144,7 +144,7 @@ def wait_for_orchestration_start(self, instance_id: str, *,

def wait_for_orchestration_completion(self, instance_id: str, *,
fetch_payloads: bool = True,
timeout: int = 60) -> Union[OrchestrationState, None]:
timeout: int = 60) -> Optional[OrchestrationState]:
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
try:
self._logger.info(f"Waiting {timeout}s for instance '{instance_id}' to complete.")
Expand All @@ -170,7 +170,7 @@ def wait_for_orchestration_completion(self, instance_id: str, *,
raise

def raise_orchestration_event(self, instance_id: str, event_name: str, *,
data: Union[Any, None] = None):
data: Optional[Any] = None):
req = pb.RaiseEventRequest(
instanceId=instance_id,
name=event_name,
Expand All @@ -180,7 +180,7 @@ def raise_orchestration_event(self, instance_id: str, event_name: str, *,
self._stub.RaiseEvent(req)

def terminate_orchestration(self, instance_id: str, *,
output: Union[Any, None] = None,
output: Optional[Any] = None,
recursive: bool = True):
req = pb.TerminateRequest(
instanceId=instance_id,
Expand All @@ -203,4 +203,4 @@ def resume_orchestration(self, instance_id: str):
def purge_orchestration(self, instance_id: str, recursive: bool = True):
req = pb.PurgeInstancesRequest(instanceId=instance_id, recursive=recursive)
self._logger.info(f"Purging instance '{instance_id}'.")
self._stub.PurgeInstances()
self._stub.PurgeInstances(req)
Empty file removed durabletask/internal/__init__.py
Empty file.
3 changes: 1 addition & 2 deletions durabletask/internal/grpc_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# Licensed under the MIT License.

from collections import namedtuple
from typing import List, Tuple

import grpc

Expand All @@ -26,7 +25,7 @@ class DefaultClientInterceptorImpl (
StreamUnaryClientInterceptor and StreamStreamClientInterceptor from grpc to add an
interceptor to add additional headers to all calls as needed."""

def __init__(self, metadata: List[Tuple[str, str]]):
def __init__(self, metadata: list[tuple[str, str]]):
super().__init__()
self._metadata = metadata

Expand Down
32 changes: 16 additions & 16 deletions durabletask/internal/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import traceback
from datetime import datetime
from typing import List, Union
from typing import Optional

from google.protobuf import timestamp_pb2, wrappers_pb2

Expand All @@ -12,14 +12,14 @@
# TODO: The new_xxx_event methods are only used by test code and should be moved elsewhere


def new_orchestrator_started_event(timestamp: Union[datetime, None] = None) -> pb.HistoryEvent:
def new_orchestrator_started_event(timestamp: Optional[datetime] = None) -> pb.HistoryEvent:
ts = timestamp_pb2.Timestamp()
if timestamp is not None:
ts.FromDatetime(timestamp)
return pb.HistoryEvent(eventId=-1, timestamp=ts, orchestratorStarted=pb.OrchestratorStartedEvent())


def new_execution_started_event(name: str, instance_id: str, encoded_input: Union[str, None] = None) -> pb.HistoryEvent:
def new_execution_started_event(name: str, instance_id: str, encoded_input: Optional[str] = None) -> pb.HistoryEvent:
return pb.HistoryEvent(
eventId=-1,
timestamp=timestamp_pb2.Timestamp(),
Expand Down Expand Up @@ -49,15 +49,15 @@ def new_timer_fired_event(timer_id: int, fire_at: datetime) -> pb.HistoryEvent:
)


def new_task_scheduled_event(event_id: int, name: str, encoded_input: Union[str, None] = None) -> pb.HistoryEvent:
def new_task_scheduled_event(event_id: int, name: str, encoded_input: Optional[str] = None) -> pb.HistoryEvent:
return pb.HistoryEvent(
eventId=event_id,
timestamp=timestamp_pb2.Timestamp(),
taskScheduled=pb.TaskScheduledEvent(name=name, input=get_string_value(encoded_input))
)


def new_task_completed_event(event_id: int, encoded_output: Union[str, None] = None) -> pb.HistoryEvent:
def new_task_completed_event(event_id: int, encoded_output: Optional[str] = None) -> pb.HistoryEvent:
return pb.HistoryEvent(
eventId=-1,
timestamp=timestamp_pb2.Timestamp(),
Expand All @@ -77,7 +77,7 @@ def new_sub_orchestration_created_event(
event_id: int,
name: str,
instance_id: str,
encoded_input: Union[str, None] = None) -> pb.HistoryEvent:
encoded_input: Optional[str] = None) -> pb.HistoryEvent:
return pb.HistoryEvent(
eventId=event_id,
timestamp=timestamp_pb2.Timestamp(),
Expand All @@ -88,7 +88,7 @@ def new_sub_orchestration_created_event(
)


def new_sub_orchestration_completed_event(event_id: int, encoded_output: Union[str, None] = None) -> pb.HistoryEvent:
def new_sub_orchestration_completed_event(event_id: int, encoded_output: Optional[str] = None) -> pb.HistoryEvent:
return pb.HistoryEvent(
eventId=-1,
timestamp=timestamp_pb2.Timestamp(),
Expand Down Expand Up @@ -116,7 +116,7 @@ def new_failure_details(ex: Exception) -> pb.TaskFailureDetails:
)


def new_event_raised_event(name: str, encoded_input: Union[str, None] = None) -> pb.HistoryEvent:
def new_event_raised_event(name: str, encoded_input: Optional[str] = None) -> pb.HistoryEvent:
return pb.HistoryEvent(
eventId=-1,
timestamp=timestamp_pb2.Timestamp(),
Expand All @@ -140,7 +140,7 @@ def new_resume_event() -> pb.HistoryEvent:
)


def new_terminated_event(*, encoded_output: Union[str, None] = None) -> pb.HistoryEvent:
def new_terminated_event(*, encoded_output: Optional[str] = None) -> pb.HistoryEvent:
return pb.HistoryEvent(
eventId=-1,
timestamp=timestamp_pb2.Timestamp(),
Expand All @@ -150,7 +150,7 @@ def new_terminated_event(*, encoded_output: Union[str, None] = None) -> pb.Histo
)


def get_string_value(val: Union[str, None]) -> Union[wrappers_pb2.StringValue, None]:
def get_string_value(val: Optional[str]) -> Optional[wrappers_pb2.StringValue]:
if val is None:
return None
else:
Expand All @@ -160,9 +160,9 @@ def get_string_value(val: Union[str, None]) -> Union[wrappers_pb2.StringValue, N
def new_complete_orchestration_action(
id: int,
status: pb.OrchestrationStatus,
result: Union[str, None] = None,
failure_details: Union[pb.TaskFailureDetails, None] = None,
carryover_events: Union[List[pb.HistoryEvent], None] = None) -> pb.OrchestratorAction:
result: Optional[str] = None,
failure_details: Optional[pb.TaskFailureDetails] = None,
carryover_events: Optional[list[pb.HistoryEvent]] = None) -> pb.OrchestratorAction:
completeOrchestrationAction = pb.CompleteOrchestrationAction(
orchestrationStatus=status,
result=get_string_value(result),
Expand All @@ -178,7 +178,7 @@ def new_create_timer_action(id: int, fire_at: datetime) -> pb.OrchestratorAction
return pb.OrchestratorAction(id=id, createTimer=pb.CreateTimerAction(fireAt=timestamp))


def new_schedule_task_action(id: int, name: str, encoded_input: Union[str, None]) -> pb.OrchestratorAction:
def new_schedule_task_action(id: int, name: str, encoded_input: Optional[str]) -> pb.OrchestratorAction:
return pb.OrchestratorAction(id=id, scheduleTask=pb.ScheduleTaskAction(
name=name,
input=get_string_value(encoded_input)
Expand All @@ -194,8 +194,8 @@ def new_timestamp(dt: datetime) -> timestamp_pb2.Timestamp:
def new_create_sub_orchestration_action(
id: int,
name: str,
instance_id: Union[str, None],
encoded_input: Union[str, None]) -> pb.OrchestratorAction:
instance_id: Optional[str],
encoded_input: Optional[str]) -> pb.OrchestratorAction:
return pb.OrchestratorAction(id=id, createSubOrchestration=pb.CreateSubOrchestrationAction(
name=name,
instanceId=instance_id,
Expand Down
386 changes: 188 additions & 198 deletions durabletask/internal/orchestrator_service_pb2.py

Large diffs are not rendered by default.

Loading
Loading