Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cadence/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@

# Import main client functionality
from .client import Client
from .worker import Registry
from . import workflow

__version__ = "0.1.0"

__all__ = [
"Client",
"Registry",
"workflow",
]
46 changes: 18 additions & 28 deletions cadence/_internal/workflow/workflow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ class DecisionResult:
decisions: list[Decision]

class WorkflowEngine:
def __init__(self, info: WorkflowInfo, client: Client, workflow_func: Callable[[Any], Any] | None = None):
def __init__(self, info: WorkflowInfo, client: Client, workflow_definition=None):
self._context = Context(client, info)
self._workflow_func = workflow_func
self._workflow_definition = workflow_definition
self._workflow_instance = None
if workflow_definition:
self._workflow_instance = workflow_definition.cls()
self._decision_manager = DecisionManager()
self._decisions_helper = DecisionsHelper(self._decision_manager)
self._is_workflow_complete = False
Expand Down Expand Up @@ -250,19 +253,17 @@ def _fallback_process_workflow_history(self, history) -> None:
async def _execute_workflow_function(self, decision_task: PollForDecisionTaskResponse) -> None:
"""
Execute the workflow function to generate new decisions.

This blocks until the workflow schedules an activity or completes.

Args:
decision_task: The decision task containing workflow context
"""
try:
# Execute the workflow function
# The workflow function should block until it schedules an activity
workflow_func = self._workflow_func
if workflow_func is None:
# Execute the workflow function from the workflow instance
if self._workflow_definition is None or self._workflow_instance is None:
logger.warning(
"No workflow function available",
"No workflow definition or instance available",
extra={
"workflow_type": self._context.info().workflow_type,
"workflow_id": self._context.info().workflow_id,
Expand All @@ -271,11 +272,14 @@ async def _execute_workflow_function(self, decision_task: PollForDecisionTaskRes
)
return

# Get the workflow run method from the instance
workflow_func = self._workflow_definition.get_run_method(self._workflow_instance)

# Extract workflow input from history
workflow_input = await self._extract_workflow_input(decision_task)

# Execute workflow function
result = self._execute_workflow_function_once(workflow_func, workflow_input)
result = await self._execute_workflow_function_once(workflow_func, workflow_input)

# Check if workflow is complete
if result is not None:
Expand All @@ -290,7 +294,7 @@ async def _execute_workflow_function(self, decision_task: PollForDecisionTaskRes
"completion_type": "success"
}
)

except Exception as e:
logger.error(
"Error executing workflow function",
Expand Down Expand Up @@ -337,7 +341,7 @@ async def _extract_workflow_input(self, decision_task: PollForDecisionTaskRespon
logger.warning("No WorkflowExecutionStarted event found in history")
return None

def _execute_workflow_function_once(self, workflow_func: Callable, workflow_input: Any) -> Any:
async def _execute_workflow_function_once(self, workflow_func: Callable, workflow_input: Any) -> Any:
"""
Execute the workflow function once (not during replay).

Expand All @@ -351,23 +355,9 @@ def _execute_workflow_function_once(self, workflow_func: Callable, workflow_inpu
logger.debug(f"Executing workflow function with input: {workflow_input}")
result = workflow_func(workflow_input)

# If the workflow function is async, we need to handle it properly
# If the workflow function is async, await it properly
if asyncio.iscoroutine(result):
# For now, use asyncio.run for async workflow functions
# TODO: Implement proper deterministic event loop for workflow execution
try:
result = asyncio.run(result)
except RuntimeError:
# If we're already in an event loop, create a new task
loop = asyncio.get_event_loop()
if loop.is_running():
# We can't use asyncio.run inside a running loop
# For now, just get the result (this may not be deterministic)
logger.warning("Async workflow function called within running event loop - may not be deterministic")
# This is a workaround - in a real implementation, we'd need proper task scheduling
result = None
else:
result = loop.run_until_complete(result)
result = await result

return result

Expand Down
8 changes: 4 additions & 4 deletions cadence/worker/_decision_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ async def _handle_task_implementation(self, task: PollForDecisionTaskResponse) -
)

try:
workflow_func = self._registry.get_workflow(workflow_type_name)
workflow_definition = self._registry.get_workflow(workflow_type_name)
except KeyError:
logger.error(
"Workflow type not found in registry",
Expand All @@ -103,9 +103,9 @@ async def _handle_task_implementation(self, task: PollForDecisionTaskResponse) -
workflow_engine = self._workflow_engines.get(cache_key)
if workflow_engine is None:
workflow_engine = WorkflowEngine(
info=workflow_info,
client=self._client,
workflow_func=workflow_func
info=workflow_info,
client=self._client,
workflow_definition=workflow_definition
)
self._workflow_engines[cache_key] = workflow_engine

Expand Down
76 changes: 41 additions & 35 deletions cadence/worker/_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@
"""

import logging
from typing import Callable, Dict, Optional, Unpack, TypedDict, Sequence, overload
from typing import Callable, Dict, Optional, Unpack, TypedDict, overload, Type, Union, TypeVar
from cadence.activity import ActivityDefinitionOptions, ActivityDefinition, ActivityDecorator, P, T
from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions

logger = logging.getLogger(__name__)

# TypeVar for workflow class types
W = TypeVar('W')


class RegisterWorkflowOptions(TypedDict, total=False):
"""Options for registering a workflow."""
Expand All @@ -28,53 +32,58 @@ class Registry:

def __init__(self) -> None:
"""Initialize the registry."""
self._workflows: Dict[str, Callable] = {}
self._workflows: Dict[str, WorkflowDefinition] = {}
self._activities: Dict[str, ActivityDefinition] = {}
self._workflow_aliases: Dict[str, str] = {} # alias -> name mapping

def workflow(
self,
func: Optional[Callable] = None,
cls: Optional[Type[W]] = None,
**kwargs: Unpack[RegisterWorkflowOptions]
) -> Callable:
) -> Union[Type[W], Callable[[Type[W]], Type[W]]]:
"""
Register a workflow function.
Register a workflow class.

This method can be used as a decorator or called directly.

Only supports class-based workflows.

Args:
func: The workflow function to register
cls: The workflow class to register
**kwargs: Options for registration (name, alias)

Returns:
The decorated function or the function itself
The decorated class

Raises:
KeyError: If workflow name already exists
ValueError: If class workflow is invalid
"""
options = RegisterWorkflowOptions(**kwargs)
def decorator(f: Callable) -> Callable:
workflow_name = options.get('name') or f.__name__

def decorator(target: Type[W]) -> Type[W]:
workflow_name = options.get('name') or target.__name__

if workflow_name in self._workflows:
raise KeyError(f"Workflow '{workflow_name}' is already registered")

self._workflows[workflow_name] = f


# Create WorkflowDefinition with type information
workflow_opts = WorkflowDefinitionOptions(name=workflow_name)
workflow_def = WorkflowDefinition.wrap(target, workflow_opts)
self._workflows[workflow_name] = workflow_def

# Register alias if provided
alias = options.get('alias')
if alias:
if alias in self._workflow_aliases:
raise KeyError(f"Workflow alias '{alias}' is already registered")
self._workflow_aliases[alias] = workflow_name

logger.info(f"Registered workflow '{workflow_name}'")
return f
if func is None:
return target

if cls is None:
return decorator
return decorator(func)
return decorator(cls)

@overload
def activity(self, func: Callable[P, T]) -> ActivityDefinition[P, T]:
Expand Down Expand Up @@ -135,25 +144,25 @@ def _register_activity(self, defn: ActivityDefinition) -> None:
self._activities[defn.name] = defn


def get_workflow(self, name: str) -> Callable:
def get_workflow(self, name: str) -> WorkflowDefinition:
"""
Get a registered workflow by name.

Args:
name: Name or alias of the workflow

Returns:
The workflow function
The workflow definition

Raises:
KeyError: If workflow is not found
"""
# Check if it's an alias
actual_name = self._workflow_aliases.get(name, name)

if actual_name not in self._workflows:
raise KeyError(f"Workflow '{name}' not found in registry")

return self._workflows[actual_name]

def get_activity(self, name: str) -> ActivityDefinition:
Expand Down Expand Up @@ -188,7 +197,7 @@ def of(*args: 'Registry') -> 'Registry':

return result

def _find_activity_definitions(instance: object) -> Sequence[ActivityDefinition]:
def _find_activity_definitions(instance: object) -> list[ActivityDefinition]:
attr_to_def = {}
for t in instance.__class__.__mro__:
for attr in dir(t):
Expand All @@ -200,10 +209,7 @@ def _find_activity_definitions(instance: object) -> Sequence[ActivityDefinition]
raise ValueError(f"'{attr}' was overridden with a duplicate activity definition")
attr_to_def[attr] = value

# Create new definitions, copying the attributes from the declaring type but using the function
# from the specific object. This allows for the decorator to be applied to the base class and the
# function to be overridden
result = []
result: list[ActivityDefinition] = []
for attr, definition in attr_to_def.items():
result.append(ActivityDefinition(getattr(instance, attr), definition.name, definition.strategy, definition.params))

Expand Down
Loading