Skip to content

Commit

Permalink
Syncs with upstream (bumps min python version, regenerates protos wit…
Browse files Browse the repository at this point in the history
…h an older grpcio-tools version) (#6)

* Update version to 0.2b1, require Python 3.9+, and enhance GitHub Actions workflow (#1) (microsoft#35)

- Bump version in `pyproject.toml` to 0.2b1 and update Python requirement to >=3.9.
- Add `protobuf` dependency in `requirements.txt`.
- Update GitHub Actions workflow to support Python versions 3.9 to 3.13 and upgrade action versions.
- Refactor type hints in various files to use `Optional` and `list` instead of `Union` and `List`.
- Improve handling of custom status in orchestration context and related functions.
- Fix purge implementation to pass required parameters.

Signed-off-by: Elena Kolevska <[email protected]>

* Downgrade required `grpcio` and `protobuf` versions (microsoft#36)

Signed-off-by: Elena Kolevska <[email protected]>

---------

Signed-off-by: Elena Kolevska <[email protected]>
Co-authored-by: Chris Gillum <[email protected]>
Co-authored-by: Bernd Verst <[email protected]>
  • Loading branch information
3 people authored Jan 13, 2025
1 parent 4fc38e2 commit 60c4633
Show file tree
Hide file tree
Showing 21 changed files with 529 additions and 757 deletions.
17 changes: 14 additions & 3 deletions .github/workflows/pr-validation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11"]
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]

steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v3
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
Expand All @@ -36,6 +36,17 @@ jobs:
- name: Pytest unit tests
run: |
pytest -m "not e2e" --verbose
# Sidecar for running e2e tests requires Go SDK
- name: Install Go SDK
uses: actions/setup-go@v5
with:
go-version: 'stable'
# Install and run the durabletask-go sidecar for running e2e tests
- name: Pytest e2e tests
run: |
go install github.com/microsoft/durabletask-go@main
durabletask-go --port 4001 &
pytest -m "e2e" --verbose
publish:
needs: build
if: startswith(github.ref, 'refs/tags/v')
Expand All @@ -59,4 +70,4 @@ jobs:
TWINE_PASSWORD: ${{ secrets.PYPI_UPLOAD_PASS }}
run: |
python -m build
twine upload dist/*
twine upload dist/*
5 changes: 3 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"editor.defaultFormatter": "ms-python.autopep8",
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.organizeImports": true,
"source.organizeImports": "explicit"
},
"editor.rulers": [
119
Expand All @@ -29,5 +29,6 @@
"coverage.xml",
"jacoco.xml",
"coverage.cobertura.xml"
]
],
"makefile.configureOnOpen": false
}
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added `set_custom_status` orchestrator API ([#31](https://github.com/microsoft/durabletask-python/pull/31)) - contributed by [@famarting](https://github.com/famarting)
- Added `purge_orchestration` client API ([#34](https://github.com/microsoft/durabletask-python/pull/34)) - contributed by [@famarting](https://github.com/famarting)

### Changes

- Protos are compiled with gRPC 1.62.3 / protobuf 3.25.X instead of the latest release. This ensures compatibility with a wider range of grpcio versions for better compatibility with other packages / libraries.

### Updates

- Updated `durabletask-protobuf` submodule reference to latest
Expand Down
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ install:
python3 -m pip install .

gen-proto:
# NOTE: There is currently a hand-edit that we make to the generated orchestrator_service_pb2.py file after it's generated to help resolve import problems.
python3 -m grpc_tools.protoc --proto_path=./submodules/durabletask-protobuf/protos --python_out=./durabletask/internal --pyi_out=./durabletask/internal --grpc_python_out=./durabletask/internal orchestrator_service.proto
cp ./submodules/durabletask-protobuf/protos/orchestrator_service.proto durabletask/internal/orchestrator_service.proto
python3 -m grpc_tools.protoc --proto_path=. --python_out=. --pyi_out=. --grpc_python_out=. ./durabletask/internal/orchestrator_service.proto
rm durabletask/internal/*.proto

.PHONY: init test-unit test-e2e gen-proto install
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ git submodule update --init
Once the submodule is available, the corresponding source code can be regenerated using the following command from the project root:

```sh
pip3 install -r dev-requirements.txt
make gen-proto
```

Expand Down
1 change: 1 addition & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
grpcio-tools==1.62.3 # 1.62.X is the latest version before protobuf 1.26.X is used which has breaking changes for Python
40 changes: 20 additions & 20 deletions durabletask/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Any, List, Tuple, TypeVar, Union
from typing import Any, Optional, TypeVar, Union

import grpc
from google.protobuf import wrappers_pb2
Expand Down Expand Up @@ -42,10 +42,10 @@ class OrchestrationState:
runtime_status: OrchestrationStatus
created_at: datetime
last_updated_at: datetime
serialized_input: Union[str, None]
serialized_output: Union[str, None]
serialized_custom_status: Union[str, None]
failure_details: Union[task.FailureDetails, None]
serialized_input: Optional[str]
serialized_output: Optional[str]
serialized_custom_status: Optional[str]
failure_details: Optional[task.FailureDetails]

def raise_if_failed(self):
if self.failure_details is not None:
Expand All @@ -64,7 +64,7 @@ def failure_details(self):
return self._failure_details


def new_orchestration_state(instance_id: str, res: pb.GetInstanceResponse) -> Union[OrchestrationState, None]:
def new_orchestration_state(instance_id: str, res: pb.GetInstanceResponse) -> Optional[OrchestrationState]:
if not res.exists:
return None

Expand Down Expand Up @@ -92,20 +92,20 @@ def new_orchestration_state(instance_id: str, res: pb.GetInstanceResponse) -> Un
class TaskHubGrpcClient:

def __init__(self, *,
host_address: Union[str, None] = None,
metadata: Union[List[Tuple[str, str]], None] = None,
log_handler = None,
log_formatter: Union[logging.Formatter, None] = None,
host_address: Optional[str] = None,
metadata: Optional[list[tuple[str, str]]] = None,
log_handler: Optional[logging.Handler] = None,
log_formatter: Optional[logging.Formatter] = None,
secure_channel: bool = False):
channel = shared.get_grpc_channel(host_address, metadata, secure_channel=secure_channel)
self._stub = stubs.TaskHubSidecarServiceStub(channel)
self._logger = shared.get_logger("client", log_handler, log_formatter)

def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInput, TOutput], str], *,
input: Union[TInput, None] = None,
instance_id: Union[str, None] = None,
start_at: Union[datetime, None] = None,
reuse_id_policy: Union[pb.OrchestrationIdReusePolicy, None] = None) -> str:
input: Optional[TInput] = None,
instance_id: Optional[str] = None,
start_at: Optional[datetime] = None,
reuse_id_policy: Optional[pb.OrchestrationIdReusePolicy] = None) -> str:

name = orchestrator if isinstance(orchestrator, str) else task.get_name(orchestrator)

Expand All @@ -122,14 +122,14 @@ def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInpu
res: pb.CreateInstanceResponse = self._stub.StartInstance(req)
return res.instanceId

def get_orchestration_state(self, instance_id: str, *, fetch_payloads: bool = True) -> Union[OrchestrationState, None]:
def get_orchestration_state(self, instance_id: str, *, fetch_payloads: bool = True) -> Optional[OrchestrationState]:
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
res: pb.GetInstanceResponse = self._stub.GetInstance(req)
return new_orchestration_state(req.instanceId, res)

def wait_for_orchestration_start(self, instance_id: str, *,
fetch_payloads: bool = False,
timeout: int = 60) -> Union[OrchestrationState, None]:
timeout: int = 60) -> Optional[OrchestrationState]:
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
try:
self._logger.info(f"Waiting up to {timeout}s for instance '{instance_id}' to start.")
Expand All @@ -144,7 +144,7 @@ def wait_for_orchestration_start(self, instance_id: str, *,

def wait_for_orchestration_completion(self, instance_id: str, *,
fetch_payloads: bool = True,
timeout: int = 60) -> Union[OrchestrationState, None]:
timeout: int = 60) -> Optional[OrchestrationState]:
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
try:
self._logger.info(f"Waiting {timeout}s for instance '{instance_id}' to complete.")
Expand All @@ -170,7 +170,7 @@ def wait_for_orchestration_completion(self, instance_id: str, *,
raise

def raise_orchestration_event(self, instance_id: str, event_name: str, *,
data: Union[Any, None] = None):
data: Optional[Any] = None):
req = pb.RaiseEventRequest(
instanceId=instance_id,
name=event_name,
Expand All @@ -180,7 +180,7 @@ def raise_orchestration_event(self, instance_id: str, event_name: str, *,
self._stub.RaiseEvent(req)

def terminate_orchestration(self, instance_id: str, *,
output: Union[Any, None] = None,
output: Optional[Any] = None,
recursive: bool = True):
req = pb.TerminateRequest(
instanceId=instance_id,
Expand All @@ -203,4 +203,4 @@ def resume_orchestration(self, instance_id: str):
def purge_orchestration(self, instance_id: str, recursive: bool = True):
req = pb.PurgeInstancesRequest(instanceId=instance_id, recursive=recursive)
self._logger.info(f"Purging instance '{instance_id}'.")
self._stub.PurgeInstances()
self._stub.PurgeInstances(req)
Empty file removed durabletask/internal/__init__.py
Empty file.
3 changes: 1 addition & 2 deletions durabletask/internal/grpc_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# Licensed under the MIT License.

from collections import namedtuple
from typing import List, Tuple

import grpc

Expand All @@ -26,7 +25,7 @@ class DefaultClientInterceptorImpl (
StreamUnaryClientInterceptor and StreamStreamClientInterceptor from grpc to add an
interceptor to add additional headers to all calls as needed."""

def __init__(self, metadata: List[Tuple[str, str]]):
def __init__(self, metadata: list[tuple[str, str]]):
super().__init__()
self._metadata = metadata

Expand Down
32 changes: 16 additions & 16 deletions durabletask/internal/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import traceback
from datetime import datetime
from typing import List, Union
from typing import Optional

from google.protobuf import timestamp_pb2, wrappers_pb2

Expand All @@ -12,14 +12,14 @@
# TODO: The new_xxx_event methods are only used by test code and should be moved elsewhere


def new_orchestrator_started_event(timestamp: Union[datetime, None] = None) -> pb.HistoryEvent:
def new_orchestrator_started_event(timestamp: Optional[datetime] = None) -> pb.HistoryEvent:
ts = timestamp_pb2.Timestamp()
if timestamp is not None:
ts.FromDatetime(timestamp)
return pb.HistoryEvent(eventId=-1, timestamp=ts, orchestratorStarted=pb.OrchestratorStartedEvent())


def new_execution_started_event(name: str, instance_id: str, encoded_input: Union[str, None] = None) -> pb.HistoryEvent:
def new_execution_started_event(name: str, instance_id: str, encoded_input: Optional[str] = None) -> pb.HistoryEvent:
return pb.HistoryEvent(
eventId=-1,
timestamp=timestamp_pb2.Timestamp(),
Expand Down Expand Up @@ -49,15 +49,15 @@ def new_timer_fired_event(timer_id: int, fire_at: datetime) -> pb.HistoryEvent:
)


def new_task_scheduled_event(event_id: int, name: str, encoded_input: Union[str, None] = None) -> pb.HistoryEvent:
def new_task_scheduled_event(event_id: int, name: str, encoded_input: Optional[str] = None) -> pb.HistoryEvent:
return pb.HistoryEvent(
eventId=event_id,
timestamp=timestamp_pb2.Timestamp(),
taskScheduled=pb.TaskScheduledEvent(name=name, input=get_string_value(encoded_input))
)


def new_task_completed_event(event_id: int, encoded_output: Union[str, None] = None) -> pb.HistoryEvent:
def new_task_completed_event(event_id: int, encoded_output: Optional[str] = None) -> pb.HistoryEvent:
return pb.HistoryEvent(
eventId=-1,
timestamp=timestamp_pb2.Timestamp(),
Expand All @@ -77,7 +77,7 @@ def new_sub_orchestration_created_event(
event_id: int,
name: str,
instance_id: str,
encoded_input: Union[str, None] = None) -> pb.HistoryEvent:
encoded_input: Optional[str] = None) -> pb.HistoryEvent:
return pb.HistoryEvent(
eventId=event_id,
timestamp=timestamp_pb2.Timestamp(),
Expand All @@ -88,7 +88,7 @@ def new_sub_orchestration_created_event(
)


def new_sub_orchestration_completed_event(event_id: int, encoded_output: Union[str, None] = None) -> pb.HistoryEvent:
def new_sub_orchestration_completed_event(event_id: int, encoded_output: Optional[str] = None) -> pb.HistoryEvent:
return pb.HistoryEvent(
eventId=-1,
timestamp=timestamp_pb2.Timestamp(),
Expand Down Expand Up @@ -116,7 +116,7 @@ def new_failure_details(ex: Exception) -> pb.TaskFailureDetails:
)


def new_event_raised_event(name: str, encoded_input: Union[str, None] = None) -> pb.HistoryEvent:
def new_event_raised_event(name: str, encoded_input: Optional[str] = None) -> pb.HistoryEvent:
return pb.HistoryEvent(
eventId=-1,
timestamp=timestamp_pb2.Timestamp(),
Expand All @@ -140,7 +140,7 @@ def new_resume_event() -> pb.HistoryEvent:
)


def new_terminated_event(*, encoded_output: Union[str, None] = None) -> pb.HistoryEvent:
def new_terminated_event(*, encoded_output: Optional[str] = None) -> pb.HistoryEvent:
return pb.HistoryEvent(
eventId=-1,
timestamp=timestamp_pb2.Timestamp(),
Expand All @@ -150,7 +150,7 @@ def new_terminated_event(*, encoded_output: Union[str, None] = None) -> pb.Histo
)


def get_string_value(val: Union[str, None]) -> Union[wrappers_pb2.StringValue, None]:
def get_string_value(val: Optional[str]) -> Optional[wrappers_pb2.StringValue]:
if val is None:
return None
else:
Expand All @@ -160,9 +160,9 @@ def get_string_value(val: Union[str, None]) -> Union[wrappers_pb2.StringValue, N
def new_complete_orchestration_action(
id: int,
status: pb.OrchestrationStatus,
result: Union[str, None] = None,
failure_details: Union[pb.TaskFailureDetails, None] = None,
carryover_events: Union[List[pb.HistoryEvent], None] = None) -> pb.OrchestratorAction:
result: Optional[str] = None,
failure_details: Optional[pb.TaskFailureDetails] = None,
carryover_events: Optional[list[pb.HistoryEvent]] = None) -> pb.OrchestratorAction:
completeOrchestrationAction = pb.CompleteOrchestrationAction(
orchestrationStatus=status,
result=get_string_value(result),
Expand All @@ -178,7 +178,7 @@ def new_create_timer_action(id: int, fire_at: datetime) -> pb.OrchestratorAction
return pb.OrchestratorAction(id=id, createTimer=pb.CreateTimerAction(fireAt=timestamp))


def new_schedule_task_action(id: int, name: str, encoded_input: Union[str, None]) -> pb.OrchestratorAction:
def new_schedule_task_action(id: int, name: str, encoded_input: Optional[str]) -> pb.OrchestratorAction:
return pb.OrchestratorAction(id=id, scheduleTask=pb.ScheduleTaskAction(
name=name,
input=get_string_value(encoded_input)
Expand All @@ -194,8 +194,8 @@ def new_timestamp(dt: datetime) -> timestamp_pb2.Timestamp:
def new_create_sub_orchestration_action(
id: int,
name: str,
instance_id: Union[str, None],
encoded_input: Union[str, None]) -> pb.OrchestratorAction:
instance_id: Optional[str],
encoded_input: Optional[str]) -> pb.OrchestratorAction:
return pb.OrchestratorAction(id=id, createSubOrchestration=pb.CreateSubOrchestrationAction(
name=name,
instanceId=instance_id,
Expand Down
386 changes: 188 additions & 198 deletions durabletask/internal/orchestrator_service_pb2.py

Large diffs are not rendered by default.

Loading

0 comments on commit 60c4633

Please sign in to comment.