Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions sleep_for_days/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Sleep for Days

This sample demonstrates how to create a Temporal workflow that runs forever, sending an email every 30 days.

To run, first see the main [README.md](../../README.md) for prerequisites.

Then create two terminals and `cd` to this directory.

Run the worker in one terminal:

poetry run python worker.py

And execute the workflow in the other terminal:

poetry run python starter.py

This sample will run indefinitely until you send a signal to `complete`. See how to send a signal via Temporal CLI [here](https://docs.temporal.io/cli/workflow#signal).

1 change: 1 addition & 0 deletions sleep_for_days/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
TASK_QUEUE = "sleep-for-days-task-queue"
18 changes: 18 additions & 0 deletions sleep_for_days/activities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from dataclasses import dataclass

from temporalio import activity


@dataclass
class SendEmailInput:
email_msg: str


@activity.defn()
async def send_email(input: SendEmailInput) -> str:
"""
A stub Activity for sending an email.
"""
result = f"Email message: {input.email_msg}, sent"
activity.logger.info(result)
return result
23 changes: 23 additions & 0 deletions sleep_for_days/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import asyncio
import uuid
from typing import Optional

from temporalio.client import Client

from sleep_for_days import TASK_QUEUE
from sleep_for_days.workflows import SleepForDaysWorkflow


async def main(client: Optional[Client] = None):
client = client or await Client.connect("localhost:7233")
wf_handle = await client.start_workflow(
SleepForDaysWorkflow.run,
id=f"sleep-for-days-workflow-id-{uuid.uuid4()}",
task_queue=TASK_QUEUE,
)
# Wait for workflow completion (runs indefinitely until it receives a signal)
print(await wf_handle.result())


if __name__ == "__main__":
asyncio.run(main())
27 changes: 27 additions & 0 deletions sleep_for_days/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import asyncio
import logging

from temporalio.client import Client
from temporalio.worker import Worker

from sleep_for_days import TASK_QUEUE
from sleep_for_days.activities import send_email
from sleep_for_days.workflows import SleepForDaysWorkflow


async def main():
client = await Client.connect("localhost:7233")

worker = Worker(
client,
task_queue=TASK_QUEUE,
workflows=[SleepForDaysWorkflow],
activities=[send_email],
)

await worker.run()


if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(main())
37 changes: 37 additions & 0 deletions sleep_for_days/workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import asyncio
from dataclasses import dataclass
from datetime import timedelta

from temporalio import workflow

with workflow.unsafe.imports_passed_through():
from sleep_for_days.activities import SendEmailInput, send_email


@workflow.defn()
class SleepForDaysWorkflow:
def __init__(self) -> None:
self.is_complete = False

@workflow.run
async def run(self) -> str:
while not self.is_complete:
await workflow.execute_activity(
send_email,
SendEmailInput("30 days until the next email"),
start_to_close_timeout=timedelta(seconds=10),
)
await workflow.wait(
[
asyncio.create_task(workflow.sleep(timedelta(days=30))),
asyncio.create_task(
workflow.wait_condition(lambda: self.is_complete)
),
],
return_when=asyncio.FIRST_COMPLETED,
)
Comment on lines +24 to +32
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait_condition has a timeout which I think would have been clearer here

return "done!"

@workflow.signal
def complete(self):
self.is_complete = True
Empty file.
53 changes: 53 additions & 0 deletions tests/sleep_for_days/workflow_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import uuid
from datetime import timedelta

from temporalio import activity
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker

from sleep_for_days.starter import TASK_QUEUE
from sleep_for_days.workflows import SendEmailInput, SleepForDaysWorkflow


async def test_sleep_for_days_workflow():
num_activity_executions = 0

# Mock out the activity to count executions
@activity.defn(name="send_email")
async def send_email_mock(input: SendEmailInput) -> str:
nonlocal num_activity_executions
num_activity_executions += 1
return input.email_msg

async with await WorkflowEnvironment.start_time_skipping() as env:
# if env.supports_time_skipping:
# pytest.skip(
# "Java test server: https://github.com/temporalio/sdk-java/issues/1903"
# )
async with Worker(
env.client,
task_queue=TASK_QUEUE,
workflows=[SleepForDaysWorkflow],
activities=[send_email_mock],
):
handle = await env.client.start_workflow(
SleepForDaysWorkflow.run,
id=str(uuid.uuid4()),
task_queue=TASK_QUEUE,
)

start_time = await env.get_current_time()
# Time-skip 5 minutes.
await env.sleep(timedelta(minutes=5))
# Check that the activity has been called, we're now waiting for the sleep to finish.
assert num_activity_executions == 1
# Time-skip 3 days.
await env.sleep(timedelta(days=90))
# Expect 3 more activity calls.
assert num_activity_executions == 4
# Send the signal to complete the workflow.
await handle.signal(SleepForDaysWorkflow.complete)
# Expect no more activity calls to have been made - workflow is complete.
assert num_activity_executions == 4
# Expect more than 90 days to have passed.
assert (await env.get_current_time() - start_time) > timedelta(days=90)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!