-
Notifications
You must be signed in to change notification settings - Fork 20
update task_resume_workflows to also resume processes in CREATED/RESUMED status #984
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…MED status - improve start_process to include logic that happens in both executors. - move threadpool executor functions to its own file.
ff408fb
to
1ac7b2d
Compare
CodSpeed Performance ReportMerging #984 will not alter performanceComparing Summary
|
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #984 +/- ##
==========================================
- Coverage 84.37% 84.05% -0.33%
==========================================
Files 213 214 +1
Lines 10275 10302 +27
Branches 1009 1011 +2
==========================================
- Hits 8670 8659 -11
- Misses 1337 1372 +35
- Partials 268 271 +3 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With drawio you can export a .png
file and embed the drawio diagram in the same file; you can use that here as a celery-flow.drawio.png
@@ -118,17 +95,15 @@ def resume_process(process_id: UUID, user_inputs: list[State] | None, user: str) | |||
|
|||
celery_task = partial(celery.task, log=local_logger, serializer="orchestrator-json") | |||
|
|||
@celery_task(name=NEW_TASK) # type: ignore | |||
def new_task(process_id, workflow_key: str, user: str) -> UUID | None: | |||
@celery_task(name=NEW_TASK, time_limit=1) # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 second time limit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
woops
@celery_task(name=NEW_TASK, time_limit=1) # type: ignore | |
@celery_task(name=NEW_TASK) # type: ignore |
|
||
### Celery Workflow/Task flow | ||
|
||
 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a brief summary about the diagram? This helps providing documentation readers with some context, and allows people to find it through search keywords.
I think this is the intended situation after this PR is merged?
@@ -460,19 +460,6 @@ def create_process( | |||
return pstat | |||
|
|||
|
|||
def thread_start_process( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a line to the PR description that these functions were moved from orchestrator.service.processes
into the executor folders?
@@ -567,6 +509,9 @@ def resume_process( | |||
""" | |||
pstat = load_process(process) | |||
|
|||
if pstat.workflow == removed_workflow: | |||
raise ValueError("This workflow cannot be resumed") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
raise ValueError("This workflow cannot be resumed") | |
raise ValueError("This workflow cannot be resumed because it has been removed") |
broadcast_func: BroadcastFunc | None = None, | ||
) -> UUID: | ||
if pstat.workflow == removed_workflow: | ||
raise ValueError("This workflow cannot be started") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
raise ValueError("This workflow cannot be started") | |
raise ValueError("This workflow cannot be started because it has been removed") |
local_logger.info("Start task", process_id=process_id, workflow_key=workflow_key) | ||
state = retrieve_input_state(process_id, "initial_state").input_state | ||
return start_process(process_id, workflow_key, state=state, user=user) | ||
return start_process(process_id, user=user) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The initial_state
is never retrieved anymore. Is this intentional? I don't see where we are currently storing the user_inputs
@@ -80,7 +67,7 @@ def _celery_resume_process( | |||
user_inputs: list[State] | None = None, | |||
user: str | None = None, | |||
**kwargs: Any, | |||
) -> UUID: | |||
) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that we're revisiting things; this function calls:
store_input_state(pstat.process_id, user_inputs, "user_input")
Could you move this to resume_process()
in the processes.py
file? I think it should be done there after the post_form
validation, with a if user_inputs:
check to prevent inserting empty input.
This would make it consistent with how it's done for new processes (start_process()
calls create_process()
which then does store_input_state(process_id, state | initial_state, "initial_state")
).
created_process_ids = get_process_ids_by_process_statuses([ProcessStatus.CREATED]) | ||
resumed_process_ids = get_process_ids_by_process_statuses([ProcessStatus.RESUMED]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not completely sure if this is the way to go.
Imagine this scenario:
- There are separate queues for 1) new workflows (status CREATED) 2) resumed workflows (status RESUMED)
- Each queue has 50 workflows queued
- For each queue there are 5 workers churning through queued workflows which they have put on status RUNNING
- When
task_resume_workflows
runs it finds 45 processes on CREATED and 45 on RESUMED so it callsresume_process()
for each of them
Unless I'm missing something, this would re-queue 90 processes that don't need to be re-queued.
In thread_start_process()
you already changed it to set the workflow to RUNNING
before executing any step. This means that chance of the celery worker going AWOL before setting it to RUNNING has become much smaller, so that is already much better. If a worker dies when a process is running, it can be resumed by an engine restart.
So, I don't think we should resume created/resumed workflows in this task.
What do you think?
return thread_start_process(validation_workflow, user_inputs=json) | ||
|
||
|
||
THREADPOOL_EXECUTION_CONTEXT: dict[str, Callable] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe the PR should mention that these dicts were moved. And that the signature of the functions has changed. Just in case someone is using them directly
Related: #898