Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

CHANGED

- Add/update type-hinting for various worker methods

## v1.2.0

ADDED:
Expand Down
5 changes: 4 additions & 1 deletion durabletask/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,10 @@ def purge_orchestration(self, instance_id: str, recursive: bool = True):
self._logger.info(f"Purging instance '{instance_id}'.")
self._stub.PurgeInstances(req)

def signal_entity(self, entity_instance_id: EntityInstanceId, operation_name: str, input: Optional[Any] = None):
def signal_entity(self,
entity_instance_id: EntityInstanceId,
operation_name: str,
input: Optional[Any] = None) -> None:
req = pb.SignalEntityRequest(
instanceId=str(entity_instance_id),
name=operation_name,
Expand Down
6 changes: 3 additions & 3 deletions durabletask/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def call_activity(self, activity: Union[Activity[TInput, TOutput], str], *,
def call_entity(self,
entity: EntityInstanceId,
operation: str,
input: Optional[TInput] = None) -> CompletableTask:
input: Optional[TInput] = None) -> CompletableTask[Any]:
"""Schedule entity function for execution.

Parameters
Expand Down Expand Up @@ -538,8 +538,8 @@ def task_id(self) -> int:
return self._task_id


# Orchestrators are generators that yield tasks and receive/return any type
Orchestrator = Callable[[OrchestrationContext, TInput], Union[Generator[Task, Any, Any], TOutput]]
# Orchestrators are generators that yield tasks, receive any type, and return TOutput
Orchestrator = Callable[[OrchestrationContext, TInput], Union[Generator[Task[Any], Any, TOutput], TOutput]]

# Activities are simple functions that can be scheduled by orchestrators
Activity = Callable[[ActivityContext, TInput], TOutput]
Expand Down
16 changes: 8 additions & 8 deletions durabletask/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,42 +150,42 @@ def __init__(self):
self.entities = {}
self.entity_instances = {}

def add_orchestrator(self, fn: task.Orchestrator) -> str:
def add_orchestrator(self, fn: task.Orchestrator[TInput, TOutput]) -> str:
if fn is None:
raise ValueError("An orchestrator function argument is required.")

name = task.get_name(fn)
self.add_named_orchestrator(name, fn)
return name

def add_named_orchestrator(self, name: str, fn: task.Orchestrator) -> None:
def add_named_orchestrator(self, name: str, fn: task.Orchestrator[TInput, TOutput]) -> None:
if not name:
raise ValueError("A non-empty orchestrator name is required.")
if name in self.orchestrators:
raise ValueError(f"A '{name}' orchestrator already exists.")

self.orchestrators[name] = fn

def get_orchestrator(self, name: str) -> Optional[task.Orchestrator]:
def get_orchestrator(self, name: str) -> Optional[task.Orchestrator[Any, Any]]:
return self.orchestrators.get(name)

def add_activity(self, fn: task.Activity) -> str:
def add_activity(self, fn: task.Activity[TInput, TOutput]) -> str:
if fn is None:
raise ValueError("An activity function argument is required.")

name = task.get_name(fn)
self.add_named_activity(name, fn)
return name

def add_named_activity(self, name: str, fn: task.Activity) -> None:
def add_named_activity(self, name: str, fn: task.Activity[TInput, TOutput]) -> None:
if not name:
raise ValueError("A non-empty activity name is required.")
if name in self.activities:
raise ValueError(f"A '{name}' activity already exists.")

self.activities[name] = fn

def get_activity(self, name: str) -> Optional[task.Activity]:
def get_activity(self, name: str) -> Optional[task.Activity[Any, Any]]:
return self.activities.get(name)

def add_entity(self, fn: task.Entity) -> str:
Expand Down Expand Up @@ -362,7 +362,7 @@ def __enter__(self):
def __exit__(self, type, value, traceback):
self.stop()

def add_orchestrator(self, fn: task.Orchestrator) -> str:
def add_orchestrator(self, fn: task.Orchestrator[TInput, TOutput]) -> str:
"""Registers an orchestrator function with the worker."""
if self._is_running:
raise RuntimeError(
Expand Down Expand Up @@ -1047,7 +1047,7 @@ def call_entity(
entity: EntityInstanceId,
operation: str,
input: Optional[TInput] = None,
) -> task.CompletableTask:
) -> task.CompletableTask[Any]:
id = self.next_sequence_number()

self.call_entity_function_helper(
Expand Down
Loading