Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions durabletask/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def create_timer(self, fire_at: Union[datetime, timedelta]) -> Task:
def call_activity(self, activity: Union[Activity[TInput, TOutput], str], *,
input: Optional[TInput] = None,
retry_policy: Optional[RetryPolicy] = None,
tags: Optional[dict[str, str]] = None) -> Task[TOutput]:
tags: Optional[dict[str, str]] = None) -> CompletableTask[TOutput]:
"""Schedule an activity for execution.

Parameters
Expand All @@ -142,7 +142,7 @@ def call_activity(self, activity: Union[Activity[TInput, TOutput], str], *,
def call_entity(self,
entity: EntityInstanceId,
operation: str,
input: Optional[TInput] = None) -> Task:
input: Optional[TInput] = None) -> CompletableTask:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this also have been CompletableTask[TOutput]?

Suggested change
input: Optional[TInput] = None) -> CompletableTask:
input: Optional[TInput] = None) -> CompletableTask[TOutput]:

"""Schedule entity function for execution.

Parameters
Expand Down Expand Up @@ -182,7 +182,7 @@ def signal_entity(
pass

@abstractmethod
def lock_entities(self, entities: list[EntityInstanceId]) -> Task[EntityLock]:
def lock_entities(self, entities: list[EntityInstanceId]) -> CompletableTask[EntityLock]:
"""Creates a Task object that locks the specified entity instances.

The locks will be acquired the next time the orchestrator yields.
Expand All @@ -206,7 +206,7 @@ def call_sub_orchestrator(self, orchestrator: Union[Orchestrator[TInput, TOutput
input: Optional[TInput] = None,
instance_id: Optional[str] = None,
retry_policy: Optional[RetryPolicy] = None,
version: Optional[str] = None) -> Task[TOutput]:
version: Optional[str] = None) -> CompletableTask[TOutput]:
"""Schedule sub-orchestrator function for execution.

Parameters
Expand All @@ -231,7 +231,7 @@ def call_sub_orchestrator(self, orchestrator: Union[Orchestrator[TInput, TOutput
# TOOD: Add a timeout parameter, which allows the task to be canceled if the event is
# not received within the specified timeout. This requires support for task cancellation.
@abstractmethod
def wait_for_external_event(self, name: str) -> Task:
def wait_for_external_event(self, name: str) -> CompletableTask:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly (here and other places in the changes)

"""Wait asynchronously for an event to be raised with the name `name`.

Parameters
Expand Down
12 changes: 6 additions & 6 deletions durabletask/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1014,7 +1014,7 @@ def create_timer_internal(
self,
fire_at: Union[datetime, timedelta],
retryable_task: Optional[task.RetryableTask] = None,
) -> task.Task:
) -> task.TimerTask:
id = self.next_sequence_number()
if isinstance(fire_at, timedelta):
fire_at = self.current_utc_datetime + fire_at
Expand All @@ -1034,7 +1034,7 @@ def call_activity(
input: Optional[TInput] = None,
retry_policy: Optional[task.RetryPolicy] = None,
tags: Optional[dict[str, str]] = None,
) -> task.Task[TOutput]:
) -> task.CompletableTask[TOutput]:
id = self.next_sequence_number()

self.call_activity_function_helper(
Expand All @@ -1047,7 +1047,7 @@ def call_entity(
entity: EntityInstanceId,
operation: str,
input: Optional[TInput] = None,
) -> task.Task:
) -> task.CompletableTask:
id = self.next_sequence_number()

self.call_entity_function_helper(
Expand All @@ -1068,7 +1068,7 @@ def signal_entity(
id, entity_id, operation_name, input
)

def lock_entities(self, entities: list[EntityInstanceId]) -> task.Task[EntityLock]:
def lock_entities(self, entities: list[EntityInstanceId]) -> task.CompletableTask[EntityLock]:
id = self.next_sequence_number()

self.lock_entities_function_helper(
Expand All @@ -1084,7 +1084,7 @@ def call_sub_orchestrator(
instance_id: Optional[str] = None,
retry_policy: Optional[task.RetryPolicy] = None,
version: Optional[str] = None,
) -> task.Task[TOutput]:
) -> task.CompletableTask[TOutput]:
id = self.next_sequence_number()
if isinstance(orchestrator, str):
orchestrator_name = orchestrator
Expand Down Expand Up @@ -1229,7 +1229,7 @@ def _exit_critical_section(self) -> None:
action = pb.OrchestratorAction(id=task_id, sendEntityMessage=entity_unlock_message)
self._pending_actions[task_id] = action

def wait_for_external_event(self, name: str) -> task.Task:
def wait_for_external_event(self, name: str) -> task.CompletableTask:
# Check to see if this event has already been received, in which case we
# can return it immediately. Otherwise, record out intent to receive an
# event with the given name so that we can resume the generator when it
Expand Down
Loading