Skip to content

Updates workflows docs to use the new workflows client #780

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 0 additions & 84 deletions daprdocs/content/en/python-sdk-docs/python-client.md
Original file line number Diff line number Diff line change
Expand Up @@ -515,89 +515,5 @@ def main():
- For a full list of state operations visit [How-To: Use the cryptography APIs]({{< ref howto-cryptography.md >}}).
- Visit [Python SDK examples](https://github.com/dapr/python-sdk/tree/master/examples/crypto) for code samples and instructions to try out cryptography

### Workflow

```python
from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext
from dapr.clients import DaprClient

instanceId = "exampleInstanceID"
workflowComponent = "dapr"
workflowName = "hello_world_wf"
eventName = "event1"
eventData = "eventData"

def main():
with DaprClient() as d:
host = settings.DAPR_RUNTIME_HOST
port = settings.DAPR_GRPC_PORT
workflowRuntime = WorkflowRuntime(host, port)
workflowRuntime = WorkflowRuntime()
workflowRuntime.register_workflow(hello_world_wf)
workflowRuntime.register_activity(hello_act)
workflowRuntime.start()

# Start the workflow
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
print(f"start_resp {start_resp.instance_id}")

# ...

# Pause Test
d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"Get response from {workflowName} after pause call: {getResponse.runtime_status}")

# Resume Test
d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"Get response from {workflowName} after resume call: {getResponse.runtime_status}")

sleep(1)
# Raise event
d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent,
event_name=eventName, event_data=eventData)

sleep(5)
# Purge Test
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
try:
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
except DaprInternalError as err:
if nonExistentIDError in err._message:
print("Instance Successfully Purged")


# Kick off another workflow for termination purposes
# This will also test using the same instance ID on a new workflow after
# the old instance was purged
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
print(f"start_resp {start_resp.instance_id}")

# Terminate Test
d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)
sleep(1)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"Get response from {workflowName} after terminate call: {getResponse.runtime_status}")

# Purge Test
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
try:
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
except DaprInternalError as err:
if nonExistentIDError in err._message:
print("Instance Successfully Purged")

workflowRuntime.shutdown()
```

- Learn more about authoring and managing workflows:
- [How-To: Author a workflow]({{< ref howto-author-workflow.md >}}).
- [How-To: Manage a workflow]({{< ref howto-manage-workflow.md >}}).
- Visit [Python SDK examples](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py) for code samples and instructions to try out Dapr Workflow.


## Related links
[Python SDK examples](https://github.com/dapr/python-sdk/tree/master/examples)
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ description: How to get up and running with the Dapr Workflow extension
no_list: true
---

{{% alert title="Note" color="primary" %}}
Dapr Workflow is currently in alpha.
{{% /alert %}}

The Dapr Python SDK provides a built in Dapr Workflow extension, `dapr.ext.workflow`, for creating Dapr services.
The Dapr Python SDK provides a built-in Dapr Workflow extension, `dapr.ext.workflow`, for creating Dapr services.

## Installation

Expand All @@ -31,12 +27,79 @@ The development package will contain features and behavior that will be compatib
{{% /alert %}}

```bash
pip3 install dapr-ext-workflow-dev
pip install dapr-ext-workflow-dev
```
{{% /codetab %}}

{{< /tabs >}}

## Example

```python
from time import sleep

import dapr.ext.workflow as wf


wfr = wf.WorkflowRuntime()


@wfr.workflow(name='random_workflow')
def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
try:
result1 = yield ctx.call_activity(step1, input=wf_input)
result2 = yield ctx.call_activity(step2, input=result1)
except Exception as e:
yield ctx.call_activity(error_handler, input=str(e))
raise
return [result1, result2]


@wfr.activity(name='step1')
def step1(ctx, activity_input):
print(f'Step 1: Received input: {activity_input}.')
# Do some work
return activity_input + 1


@wfr.activity
def step2(ctx, activity_input):
print(f'Step 2: Received input: {activity_input}.')
# Do some work
return activity_input * 2

@wfr.activity
def error_handler(ctx, error):
print(f'Executing error handler: {error}.')
# Do some compensating work


if __name__ == '__main__':
wfr.start()
sleep(10) # wait for workflow runtime to start

wf_client = wf.DaprWorkflowClient()
instance_id = wf_client.schedule_new_workflow(workflow=task_chain_workflow, input=42)
print(f'Workflow started. Instance ID: {instance_id}')
state = wf_client.wait_for_workflow_completion(instance_id)
print(f'Workflow completed! Status: {state.runtime_status}')

wfr.shutdown()
```

- Learn more about authoring and managing workflows:
- [How-To: Author a workflow]({{< ref howto-author-workflow.md >}}).
- [How-To: Manage a workflow]({{< ref howto-manage-workflow.md >}}).
-
- Visit [Python SDK examples](https://github.com/dapr/python-sdk/tree/main/examples/workflow) for code samples and instructions to try out Dapr Workflow:
- [Simple workflow example]({{< ref python-workflow.md >}})
- [Task chaining example](https://github.com/dapr/python-sdk/blob/main/examples/workflow/task_chaining.py)
- [Fan-out/Fan-in example](https://github.com/dapr/python-sdk/blob/main/examples/workflow/fan_out_fan_in.py)
- [Child workflow example](https://github.com/dapr/python-sdk/blob/main/examples/workflow/child_workflow.py)
- [Human approval example](https://github.com/dapr/python-sdk/blob/main/examples/workflow/human_approval.py)
- [Monitor example](https://github.com/dapr/python-sdk/blob/main/examples/workflow/monitor.py)


## Next steps

{{< button text="Getting started with the Dapr Workflow Python SDK" page="python-workflow.md" >}}
Loading