Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend the recursive termination test to match the test in durabletask-go #10

Merged
merged 2 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,11 @@ make test-unit

### Running E2E tests

The E2E (end-to-end) tests require a sidecar process to be running. You can use the Dapr sidecar for this or run a Durable Task test sidecar using the following `docker` command:
The E2E (end-to-end) tests require a sidecar process to be running. You can use the Dapr sidecar for this or run a Durable Task test sidecar using the following command:

```sh
docker run --name durabletask-sidecar -p 4001:4001 --env 'DURABLETASK_SIDECAR_LOGLEVEL=Debug' --rm cgillum/durabletask-sidecar:latest start --backend Emulator
go install github.com/dapr/durabletask-go@main
durabletask-go --port 4001
```

To run the E2E tests, run the following command from the project root:
Expand Down
73 changes: 45 additions & 28 deletions tests/test_orchestration_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,39 +279,56 @@ def orchestrator(ctx: task.OrchestrationContext, _):
assert state.serialized_output == json.dumps("some reason for termination")

def test_terminate_recursive():
def root(ctx: task.OrchestrationContext, _):
result = yield ctx.call_sub_orchestrator(child)
return result
def child(ctx: task.OrchestrationContext, _):
result = yield ctx.wait_for_external_event("my_event")
return result
thread_lock = threading.Lock()
activity_counter = 0
delay_time = 4 # seconds

# Start a worker, which will connect to the sidecar in a background thread
with worker.TaskHubGrpcWorker() as w:
w.add_orchestrator(root)
w.add_orchestrator(child)
w.start()
def increment(ctx, _):
with thread_lock:
nonlocal activity_counter
activity_counter += 1
raise Exception("Failed: Should not have executed the activity")

task_hub_client = client.TaskHubGrpcClient()
id = task_hub_client.schedule_new_orchestration(root)
state = task_hub_client.wait_for_orchestration_start(id, timeout=30)
assert state is not None
assert state.runtime_status == client.OrchestrationStatus.RUNNING
def orchestrator_child(ctx: task.OrchestrationContext, activity_count: int):
due_time = ctx.current_utc_datetime + timedelta(seconds=delay_time)
yield ctx.create_timer(due_time)
yield ctx.call_activity(increment)

# Terminate root orchestration(recursive set to True by default)
task_hub_client.terminate_orchestration(id, output="some reason for termination")
state = task_hub_client.wait_for_orchestration_completion(id, timeout=30)
assert state is not None
assert state.runtime_status == client.OrchestrationStatus.TERMINATED
def parent_orchestrator(ctx: task.OrchestrationContext, count: int):
tasks = []
for _ in range(count):
tasks.append(ctx.call_sub_orchestrator(orchestrator_child, input=count))
yield task.when_all(tasks)

# Verify that child orchestration is also terminated
c = task_hub_client.wait_for_orchestration_completion(id, timeout=30)
assert state is not None
assert state.runtime_status == client.OrchestrationStatus.TERMINATED
for recurse in [True, False]:
with worker.TaskHubGrpcWorker() as w:
w.add_activity(increment)
w.add_orchestrator(orchestrator_child)
w.add_orchestrator(parent_orchestrator)
w.start()

task_hub_client = client.TaskHubGrpcClient()
instance_id = task_hub_client.schedule_new_orchestration(parent_orchestrator, input=5)

time.sleep(2)

output = "Recursive termination = {recurse}"
task_hub_client.terminate_orchestration(instance_id, output=output, recursive=recurse)


metadata = task_hub_client.wait_for_orchestration_completion(instance_id, timeout=30)

assert metadata is not None
assert metadata.runtime_status == client.OrchestrationStatus.TERMINATED
assert metadata.serialized_output == f'"{output}"'

time.sleep(delay_time)

if recurse:
assert activity_counter == 0, "Activity should not have executed with recursive termination"
else:
assert activity_counter == 5, "Activity should have executed without recursive termination"

task_hub_client.purge_orchestration(id)
state = task_hub_client.get_orchestration_state(id)
assert state is None


def test_continue_as_new():
Expand Down
Loading