-
Notifications
You must be signed in to change notification settings - Fork 4
feat: Register workflow via WorkflowDefinition instead of raw callable #37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
adb578b
60b1444
53e2f3d
dae76e6
6bf973b
2d999a1
ca8fcd6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,10 +2,118 @@ | |
| from contextlib import contextmanager | ||
| from contextvars import ContextVar | ||
| from dataclasses import dataclass | ||
| from typing import Iterator | ||
| from typing import Iterator, Callable, TypeVar, TypedDict, Type, cast, Any | ||
| from functools import wraps | ||
|
|
||
| from cadence.client import Client | ||
|
|
||
| T = TypeVar('T') | ||
|
|
||
|
|
||
| class WorkflowDefinitionOptions(TypedDict, total=False): | ||
| """Options for defining a workflow.""" | ||
| name: str | ||
|
|
||
|
|
||
| class WorkflowDefinition: | ||
| """ | ||
| Definition of a workflow class with metadata. | ||
| Similar to ActivityDefinition but for workflow classes. | ||
| Provides type safety and metadata for workflow classes. | ||
| """ | ||
|
|
||
| def __init__(self, cls: Type, name: str): | ||
| self._cls = cls | ||
| self._name = name | ||
|
|
||
| @property | ||
| def name(self) -> str: | ||
| """Get the workflow name.""" | ||
| return self._name | ||
|
|
||
| @property | ||
| def cls(self) -> Type: | ||
| """Get the workflow class.""" | ||
| return self._cls | ||
|
|
||
| def get_run_method(self, instance: Any) -> Callable: | ||
natemort marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| """Get the workflow run method from an instance of the workflow class.""" | ||
| for attr_name in dir(instance): | ||
| if attr_name.startswith('_'): | ||
| continue | ||
| attr = getattr(instance, attr_name) | ||
| if callable(attr) and hasattr(attr, '_workflow_run'): | ||
| return cast(Callable, attr) | ||
| raise ValueError(f"No @workflow.run method found in class {self._cls.__name__}") | ||
|
|
||
| @staticmethod | ||
| def wrap(cls: Type, opts: WorkflowDefinitionOptions) -> 'WorkflowDefinition': | ||
| """ | ||
| Wrap a class as a WorkflowDefinition. | ||
| Args: | ||
| cls: The workflow class to wrap | ||
| opts: Options for the workflow definition | ||
| Returns: | ||
| A WorkflowDefinition instance | ||
| Raises: | ||
| ValueError: If no run method is found or multiple run methods exist | ||
| """ | ||
| name = cls.__name__ | ||
| if "name" in opts and opts["name"]: | ||
| name = opts["name"] | ||
|
|
||
| # Validate that the class has exactly one run method | ||
| run_method_count = 0 | ||
| for attr_name in dir(cls): | ||
| if attr_name.startswith('_'): | ||
| continue | ||
|
|
||
| attr = getattr(cls, attr_name) | ||
| if not callable(attr): | ||
| continue | ||
|
|
||
| # Check for workflow run method | ||
| if hasattr(attr, '_workflow_run'): | ||
| run_method_count += 1 | ||
|
|
||
| if run_method_count == 0: | ||
| raise ValueError(f"No @workflow.run method found in class {cls.__name__}") | ||
| elif run_method_count > 1: | ||
| raise ValueError(f"Multiple @workflow.run methods found in class {cls.__name__}") | ||
|
|
||
| return WorkflowDefinition(cls, name) | ||
|
|
||
|
|
||
| def run(func: Callable[..., T]) -> Callable[..., T]: | ||
|
||
| """ | ||
| Decorator to mark a method as the main workflow run method. | ||
| Args: | ||
| func: The method to mark as the workflow run method | ||
| Returns: | ||
| The decorated method with workflow run metadata | ||
| """ | ||
| @wraps(func) | ||
| def wrapper(*args, **kwargs): | ||
| return func(*args, **kwargs) | ||
|
|
||
| # Attach metadata to the function | ||
|
||
| wrapper._workflow_run = True # type: ignore | ||
|
||
| return wrapper | ||
|
|
||
|
|
||
| # Create a simple namespace object for the workflow decorators | ||
| class _WorkflowNamespace: | ||
|
||
| run = staticmethod(run) | ||
|
|
||
| workflow = _WorkflowNamespace() | ||
|
|
||
|
|
||
| @dataclass | ||
| class WorkflowInfo: | ||
| workflow_type: str | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: If the return type is the same as the input type, we can use generics to indicate that. We can do this as a followup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I've added a generic for this