type | title | linkTitle | weight | description | no_list |
---|---|---|---|---|---|
docs |
Dapr Python SDK integration with Dapr Workflow extension |
Dapr Workflow |
400000 |
How to get up and running with the Dapr Workflow extension |
true |
The Dapr Python SDK provides a built-in Dapr Workflow extension, dapr.ext.workflow
, for creating Dapr services.
You can download and install the Dapr Workflow extension with:
{{< tabs Stable Development>}}
{{% codetab %}}
pip install dapr-ext-workflow
{{% /codetab %}}
{{% codetab %}}
{{% alert title="Note" color="warning" %}}
The development package will contain features and behavior that will be compatible with the pre-release version of the Dapr runtime. Make sure to uninstall any stable versions of the Python SDK extension before installing the dapr-dev
package.
{{% /alert %}}
pip install dapr-ext-workflow-dev
{{% /codetab %}}
{{< /tabs >}}
from time import sleep
import dapr.ext.workflow as wf
wfr = wf.WorkflowRuntime()
@wfr.workflow(name='random_workflow')
def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
try:
result1 = yield ctx.call_activity(step1, input=wf_input)
result2 = yield ctx.call_activity(step2, input=result1)
except Exception as e:
yield ctx.call_activity(error_handler, input=str(e))
raise
return [result1, result2]
@wfr.activity(name='step1')
def step1(ctx, activity_input):
print(f'Step 1: Received input: {activity_input}.')
# Do some work
return activity_input + 1
@wfr.activity
def step2(ctx, activity_input):
print(f'Step 2: Received input: {activity_input}.')
# Do some work
return activity_input * 2
@wfr.activity
def error_handler(ctx, error):
print(f'Executing error handler: {error}.')
# Do some compensating work
if __name__ == '__main__':
wfr.start()
sleep(10) # wait for workflow runtime to start
wf_client = wf.DaprWorkflowClient()
instance_id = wf_client.schedule_new_workflow(workflow=task_chain_workflow, input=42)
print(f'Workflow started. Instance ID: {instance_id}')
state = wf_client.wait_for_workflow_completion(instance_id)
print(f'Workflow completed! Status: {state.runtime_status}')
wfr.shutdown()
- Learn more about authoring and managing workflows:
- [How-To: Author a workflow]({{< ref howto-author-workflow.md >}}).
- [How-To: Manage a workflow]({{< ref howto-manage-workflow.md >}}).
- Visit Python SDK examples for code samples and instructions to try out Dapr Workflow:
- [Simple workflow example]({{< ref python-workflow.md >}})
- Task chaining example
- Fan-out/Fan-in example
- Child workflow example
- Human approval example
- Monitor example
{{< button text="Getting started with the Dapr Workflow Python SDK" page="python-workflow.md" >}}