Skip to content

Conversation

nvnieuwk
Copy link
Contributor

@nvnieuwk nvnieuwk commented Aug 5, 2025

This PR implements actual queuing into the v03 pipeline by expanding the current system.

Main changes

  1. Whenever a valid POST request is being sent to the loading_pipeline_enqueue route, a new queue file is created with a name structured as follows: request_<uuid>.json where <uuid> is a unique identifier to prevent any filename collisions
  2. The pipeline worker will check if the queue directory is empty, when not empty, the worker will take the oldest queue file and run the job corresponding to that file. When the job is done, the file gets deleted and the next one is started

@nvnieuwk nvnieuwk requested a review from a team as a code owner August 5, 2025 15:28
@@ -92,6 +92,7 @@ async def test_loading_pipeline_enqueue(self):
'projects_to_run': ['project_a'],
'reference_genome': 'GRCh38',
'sample_type': 'WGS',
'skip_check_sex_and_relatedness': False,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this just a stray line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests were failing because that line was missing. No idea why but it didn't seem related to my changes.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, I will take a look into this as it is unexpected!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

@@ -1,6 +1,8 @@
import hashlib
import os

from uuid import uuid1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might actually be better to use the request_id (currently a timestamp) being assigned by the pipeline_worker to instead be passed in to loading_pipeline_queue_path(). That value is lexically sortable, for example:

20250805-123456
20250805-223000
20250806-001500

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea! I'll give it a go

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've ultimately decided to do a combination of both methods. A queue file would look something like this: request_20250805-123456_<uuid>.json. I chose to keep the uuid in there to be prevent losses in jobs when multiple jobs are submitted within the same second

f'request_{uuid1().int}.json',
)

def get_oldest_queue_path() -> str:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method might actually fit nicely in runs.py. You'll probably want to abstract the

os.path.join(LOCAL_DISK_MOUNT_PATH,
         'loading_pipeline_queue')

into a helper in some way that would be shared between loading_pipeline_queue_path(run_id: str) and this method!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the method to runs.py :)

@bpblanken
Copy link
Collaborator

@nvnieuwk Thanks for contributing! This has been on our roadmap and is definitely needed. I left a few comments, and we'll need to get the build passing, but the idea looks solid to me.

@@ -35,24 +35,6 @@ async def loading_pipeline_enqueue(request: web.Request) -> web.Response:
except ValueError as e:
raise web.HTTPBadRequest from e

try:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need to do this on this pr, but a back pressure notion of "there are too many files in the queue" would make this more complete! I can add a ticket.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, what would be 'too many files' in this case?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about 5 or 10?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would probably want to queue more in our case. Would it be bad if the limit is higher? (I'm thinking 1000 even)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could make it an env var? The reason for the low suggested limit is just the simplicity of the mechanism. If you're reliably queuing up dozens of requests from the UI, you might want to consider joint calling your VCF and loading multiple projects at once.

We're actively trying to make the pipeline more performant and better able to support concurrent loads as well, which will hopefully ease some of this burden!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea, thank you! We sadly can't do analysis on large cohorts at our lab due to several reasons, but having a way to configure that limit might be the best way to handle this.

Also really looking forward to the concurrent loads!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the LOADING_QUEUE_LIMIT environment variable with a default of 10 for this. The app will first check if the queue is full before adding a file to it and will return a 409 error in this case.

)
return os.path.join(
loading_pipeline_queue_dir(),
f'request_{run_id}_{uuid.uuid1().int}.json',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be simpler to just use random.randint(0, 99) or something similar here. We run into issues with run_ids being too big in our system sometimes, so just keeping it less verbose would be nice!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see that you're not including it when you regex parse downstream. I think if you shorten the randomness you should be fine to include it as part of the run id.

Copy link
Contributor Author

@nvnieuwk nvnieuwk Aug 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, I'll just use the first 5 characters or something like that

Copy link
Collaborator

@bpblanken bpblanken left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nvnieuwk thanks for incorporating the feedback! looks like I might need to move a few imports around to get the build passing but things looks fully functional and correct to me!

@nvnieuwk
Copy link
Contributor Author

nvnieuwk commented Aug 7, 2025

Awesome! I can also take a look at it tomorrow if you haven't done it by then :)

@bpblanken bpblanken merged commit 7d6278f into broadinstitute:main Aug 7, 2025
1 check failed
@nvnieuwk nvnieuwk deleted the real-queue branch August 8, 2025 07:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants