Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support concurrent execution of the same DAG #786

Open
yohamta opened this issue Jan 10, 2025 · 29 comments
Open

Support concurrent execution of the same DAG #786

yohamta opened this issue Jan 10, 2025 · 29 comments
Assignees
Labels
enhancement New feature to be implemented
Milestone

Comments

@yohamta
Copy link
Collaborator

yohamta commented Jan 10, 2025

No description provided.

@yohamta yohamta added the enhancement New feature to be implemented label Jan 10, 2025
@yohamta yohamta added this to the v1.17.0 milestone Jan 10, 2025
@yohamta yohamta self-assigned this Jan 10, 2025
@ghansham
Copy link

Out of curiosity, what is the reason? Do we have different params?

@yohamta
Copy link
Collaborator Author

yohamta commented Jan 10, 2025

There are two reasons:

  1. It allows to reuse common functionalities that are called from multiple DAGs. For now it can only run single instance per a DAG at a time.
  2. It will unlock more interesting future functionalities such as for loop or map reduce.

@ghansham
Copy link

One more query, how do you plan to save history for such parallel dag runs.

@yohamta
Copy link
Collaborator Author

yohamta commented Jan 10, 2025

I don't think we need to make changes to the persistence layer or history data. Currently, it uses a Request ID (UUID) to identify individual execution history. Even with parallel execution, I believe we can continue using the same mechanism.

The biggest changes required are in the agent process and the Web UI. Currently, all functionalities rely on the fact that only one agent process is running for a particular DAG at a time (e.g., interacting a running process to get the realtime update). I've been pondering how to manage multiple agent processes to enable parallel execution of a DAG. At the moment, I plan to create a new parent process (a service) that manages a group of agent processes.

@ghansham
Copy link

ghansham commented Jan 10, 2025

You may need to append some uuid kind of thing with dagname to uniquely identify it and that may become your new ID on agent process side. On client side, you may need to identify it using params because I don't think we will be running same dag with exactly same params.

@yohamta
Copy link
Collaborator Author

yohamta commented Jan 11, 2025

Currently, the DAG execution history is managed using composite keys consisting of the DAG name and Request ID. The status files are stored in the path data/{DAG name}_{Hash}/{DAG name}-{Timestamp}-{Request ID}.dat. At the moment, I don't currently see a need to append an ID to the DAG name. Please let me know if I'm overlooking something :)

And yes, parameters can be a way to differentiate executions for users on both the Web UI and CLI.

@ghansham
Copy link

One of the point may be to disallow user to run same dag with same set of params.

@yohamta
Copy link
Collaborator Author

yohamta commented Jan 11, 2025

I'm not sure if it's necessary, as some common utility DAGs might get the same parameters, for example target_directory for backup task. What is the reasoning behind your view that identical parameters should be disallowed?

@ghansham
Copy link

Logically a program or script should generate same output for same input. Backup scripts that you mentioned run in cron but not concurrently. And if they are running concurrently then source and destination given as cmdline args (aka params in dagu) should be different otherwise they may corrupt the output files being backed up. For map reduce task and "for" loops, it should have data divided into chunks and same dag should be running on different parts of same data.

@yohamta
Copy link
Collaborator Author

yohamta commented Jan 12, 2025

Ah, I see. You're absolutely right. Yes, normally, users should not run the same DAG with the same set of parameters. That's an excellent point. I'll try to implement to prevent it. Thanks for bringing it up!

@ghansham
Copy link

ghansham commented Jan 12, 2025

What we are trying to implement actually already exists if we invoke the common dag as a Subdag with different params.

@yohamta
Copy link
Collaborator Author

yohamta commented Jan 12, 2025

I might be misunderstanding something, but here’s the scenario I’m thinking of:

Let's say we have a common DAG called backup, and we define two separate DAGs like this:

DAG: backup_1:

steps:
  - name: call_backup
    run: backup
    params: DIR=/data1

DAG: backup_2:

steps:
  - name: call_backup
    run: backup
    params: DIR=/data2

If you try to run these two DAGs simultaneously, one of them will fail with the current implementation even though the params are different.

@ghansham
Copy link

Oh.. that should be implemented. Do you mean that I can't have two dags with same subdag being called with different arguments. That feature is very much desirable.

@yohamta
Copy link
Collaborator Author

yohamta commented Jan 12, 2025

Yes, that's correct. I'll work on implementing this functionality as soon as possible.

@ghansham
Copy link

ghansham commented Jan 12, 2025

One possible way could be to suffix the uuid generated using params in the dagid and all the unix processes we are running should take it. Logically we will be treating it as a different dag.

@yohamta
Copy link
Collaborator Author

yohamta commented Jan 12, 2025

Oh, that sounds like a great idea for implementing it quickly and easily. I'll give it a try.

@ghansham
Copy link

ghansham commented Jan 12, 2025

We can't ignore the case when we have multiple subdags being called up from a single Master dag. I think this case will also be handled with the above mentioned approach. We can call these dags composable dags.

Is there a way we can do validation on the set of params specified before running a dag. That will help in bringing uniformity across multiple runs of same dag with different set of params. I am not sure though.

@yohamta
Copy link
Collaborator Author

yohamta commented Jan 12, 2025

Could you elaborate on what kind of validation you have in mind? For example, are you thinking about enforcing specific rules for allowed parameter values or ensuring that no two runs of the same DAG use conflicting parameters?

@ghansham
Copy link

I think when we are using common dag, it is enforced by design. So it is not required. You can ignore this.

@wakatara
Copy link

wakatara commented Feb 6, 2025

I can give a good example of this I am using currently in Airflow (and jus asked about this in the Discord).

I have a scientific imaging application which gets new tlescope data every day. I have a DAG that sweeps the incoming directory for new files and then for each file launches a subdags composed of tasks for description, calibraiton, and photometry before inserting those results into a database. Run sequentially without running concurrent/parallel executions of the subdag, this would take days for each day's input. If I'm running 128 of those dags at the same time (and pretty sure I could run more in Go than I am in airflow), this takes about 2 hours each day.

Keen to see this feature implemented since stable concurrent execution was one of the thing that prevented me from using Prefect over Airflow. And Airflow has been cantankerous.

(wheras I'm thinking with a little work I could integrate Dagu directly into the docker-compose of our main Go lang API app and massively simplify our architecture.).

@ghansham
Copy link

ghansham commented Feb 6, 2025

There is a much simpler way to do this. This is exactly what we are doing. We create a template yaml. And keep monitoring the directory waiting for new file. And whenever new data arrives we create a yaml from the template yaml. The yaml has a processing sequence where each process expects a uniqueID (e.g. time of acquisition), we set that uniqueID as variable within the yaml. And while creating yaml in the dag_path from template yaml, we set the uniquied. And then simply invoke post request to start the dag. I am pretty sure you can use this for your use case. I can help you setting it up. Downside is that that processing sequence is getting replicated but that will be fixed the moment the current issue will be fixed where we will be able to run a subdag concurrently with different uniqueIDs as argument. We are getting around 72 to 360 datasets per day. And have a process sequence of around 70+ processes. And we have been running this for last 8 months without fail. We are able to concurrently run multiple dags. All thanks to dagu architecture.

Advantage of above mentioned approach (creating separate yaml for each acquisition time) is we can track the processing status of datasets.

@ghansham
Copy link

ghansham commented Feb 6, 2025

@wakatara how many dags you are running per day and how much compute intensive the dags are? What is the data volume you are processing. What kind of data telescope is providing and the kind of corrections you are applying. If possible can you share sample data and correction details that you are applying.

@wakatara
Copy link

wakatara commented Feb 7, 2025

@ghansham
Sure... the current service (which I need to scale out) takes roughly 2000 FITS files a day (a special file used for photography in astronomy) -- note this will increase quite a bit as we add on more observing projects - and then runs them through a number of apis which return results, pass them onto the next task, and then gives results.

As mentioned, the key issue is running these files in parallel. The tasks themselves are merely api calls though they can take quite a bit of time to calculate (I hit a number of science backend points, though those are calcualted completely separately and not (at least currently) part of the application. So, this is more calling apis, getting results, and then passing those reulst to the next call which gets more information.

Without boring you too much on way the sausages get made, the important pipeline function of the code looks like this (in python tasks for Airflow... I've written them so they should be easily transferrable as python tasks to trigger via Dagu (or Prefect... the other contender right now.).

def sci_backend_processing(**kwargs):
    # file = kwargs['dag_run'].conf['unprocessed_file']
    file = kwargs["file"]
    print(f"Processing file: {file}")
    scratch = t.copy_to_scratch(file)

    description = t.describe_fits(file, scratch)
    description = t.identify(file, description)

    orbit_job_id = t.object_orbit_submit(description["OBJECT"])
    time.sleep(random.uniform(25, 35))
    orbit = t.object_orbit(orbit_job_id, description)

    description = t.flight_checks(description)
    description = t.get_database_ids(description)

    description["SOURCE-FILEPATH"] = description["FITS-FILE"]
    filename = os.path.basename(description["SOURCE-FILEPATH"])
    path = f"/data/staging/datalake/{ description['PDS4-LID'] }/{ description['ISO-DATE-LAKE'] }/{ description['INSTRUMENT'] }/{filename}"
    description["LAKE-FILEPATH"] = path

    print(f"Description hash: { description }")

    # Calibration and ATLAS pre-calibrated override
    filepath = os.path.normpath(file).split(os.path.sep)
    if filepath[-4] == "atlas":
        calibration = t.calibrate_fits_atlas(scratch, file)
    else:
        calibration_job_id = t.calibrate_fits_submit(scratch, file)
        time.sleep(random.uniform(60, 90))
        calibration = t.calibrate_fits(calibration_job_id, description)

    # Photometru submit and retrieval
    photom_job_id = t.photometry_fits_submit(
        scratch, file, description["OBJECT"], "APERTURE"
    )
    filepath = os.path.normpath(file).split(os.path.sep)
    if filepath[-4] == "atlas":
        time.sleep(random.uniform(20, 30))
    if filepath[-5] == "gemini":
        print(
            "Gemini file: Taking longer on photometry wait due to Gemini file size photometry."
        )
        time.sleep(random.uniform(60, 90))
    photometry = t.photometry_fits(photom_job_id, description)

    ephem_job_id = t.object_ephemerides_submit(description, orbit)
    time.sleep(random.uniform(35, 45))
    ephemerides = t.object_ephemerides(ephem_job_id, description)

    orbit_coords_id = t.record_orbit_submit(description["OBJECT"], orbit)
    time.sleep(random.uniform(20, 30))
    orbit_coords = t.record_orbit(orbit_coords_id, description)

    t.database_inserts(description, calibration, photometry, ephemerides, orbit_coords)
    t.move_to_datalake(scratch, description)

So,

  1. acquires file from the fan out process (running about 128 of these concurrently in Airflow atm) and moves to scratch area (since we modify the files via calibration etc)
  2. Gets headers from the fits file which has various metadata about the image, target, telescope etc etc
  3. Identifies the target (important if there is no identifying data in the thing or the orbit is out of whack - ie. someone thought they were looking at something different than they actually were. happens sometimes.
  4. Calculate the orbit of the object for this apparition - important since takes a while and having the cached is useful later
  5. Run a calibration on the image and add extra metadata (basically adjusts for stars etc etc so as to be able to prepare to run photometry)
  6. Run photometry on the image (calculates things like magnitude, errors etc etc... ).
  7. Insert all the stuff into the database tables image, photometry etc.
  8. Move scratch file that's been modified to the datalake (just a semantically structured file store really.).

Does that provide a good enough overview?

My key issue is we will need to ramp this up somewhat as we get more data sources coming online and ingsting. As well, things get really interesting when we get to large projects coming online which probably will require streaing and such...

Hope that helps. Airflow has been useful, but a very problem child so would love to replace it with something simpler and, as you say in your README, more dev friendly.

thanks!

@wakatara
Copy link

wakatara commented Feb 7, 2025

@ghansham The simpler way you allude to, I'd love to see it if you can put the code or gists up somewhere I could take a peak. I spent a number of weeks trying to get Prefect working (which is vastly simpler but had many issues with concurrent execution in its 2.x version mostly due to memory leaks.). Keen to also get this working in Go lang since trust its concurency model much more and, well... it's been rock solid with our APIs and Task queuing to date. Most of my issues right now are Airflow.

@ghansham
Copy link

ghansham commented Feb 7, 2025 via email

@ghansham
Copy link

ghansham commented Feb 7, 2025

@ghansham Sure... the current service (which I need to scale out) takes roughly 2000 FITS files a day (a special file used for photography in astronomy) -- note this will increase quite a bit as we add on more observing projects - and then runs them through a number of apis which return results, pass them onto the next task, and then gives results.

As mentioned, the key issue is running these files in parallel. The tasks themselves are merely api calls though they can take quite a bit of time to calculate (I hit a number of science backend points, though those are calcualted completely separately and not (at least currently) part of the application. So, this is more calling apis, getting results, and then passing those reulst to the next call which gets more information.

Without boring you too much on way the sausages get made, the important pipeline function of the code looks like this (in python tasks for Airflow... I've written them so they should be easily transferrable as python tasks to trigger via Dagu (or Prefect... the other contender right now.).

def sci_backend_processing(**kwargs):
# file = kwargs['dag_run'].conf['unprocessed_file']
file = kwargs["file"]
print(f"Processing file: {file}")
scratch = t.copy_to_scratch(file)

description = t.describe_fits(file, scratch)
description = t.identify(file, description)

orbit_job_id = t.object_orbit_submit(description["OBJECT"])
time.sleep(random.uniform(25, 35))
orbit = t.object_orbit(orbit_job_id, description)

description = t.flight_checks(description)
description = t.get_database_ids(description)

description["SOURCE-FILEPATH"] = description["FITS-FILE"]
filename = os.path.basename(description["SOURCE-FILEPATH"])
path = f"/data/staging/datalake/{ description['PDS4-LID'] }/{ description['ISO-DATE-LAKE'] }/{ description['INSTRUMENT'] }/{filename}"
description["LAKE-FILEPATH"] = path

print(f"Description hash: { description }")

# Calibration and ATLAS pre-calibrated override
filepath = os.path.normpath(file).split(os.path.sep)
if filepath[-4] == "atlas":
    calibration = t.calibrate_fits_atlas(scratch, file)
else:
    calibration_job_id = t.calibrate_fits_submit(scratch, file)
    time.sleep(random.uniform(60, 90))
    calibration = t.calibrate_fits(calibration_job_id, description)

# Photometru submit and retrieval
photom_job_id = t.photometry_fits_submit(
    scratch, file, description["OBJECT"], "APERTURE"
)
filepath = os.path.normpath(file).split(os.path.sep)
if filepath[-4] == "atlas":
    time.sleep(random.uniform(20, 30))
if filepath[-5] == "gemini":
    print(
        "Gemini file: Taking longer on photometry wait due to Gemini file size photometry."
    )
    time.sleep(random.uniform(60, 90))
photometry = t.photometry_fits(photom_job_id, description)

ephem_job_id = t.object_ephemerides_submit(description, orbit)
time.sleep(random.uniform(35, 45))
ephemerides = t.object_ephemerides(ephem_job_id, description)

orbit_coords_id = t.record_orbit_submit(description["OBJECT"], orbit)
time.sleep(random.uniform(20, 30))
orbit_coords = t.record_orbit(orbit_coords_id, description)

t.database_inserts(description, calibration, photometry, ephemerides, orbit_coords)
t.move_to_datalake(scratch, description)

So,

  1. acquires file from the fan out process (running about 128 of these concurrently in Airflow atm) and moves to scratch area (since we modify the files via calibration etc)
  2. Gets headers from the fits file which has various metadata about the image, target, telescope etc etc
  3. Identifies the target (important if there is no identifying data in the thing or the orbit is out of whack - ie. someone thought they were looking at something different than they actually were. happens sometimes.
  4. Calculate the orbit of the object for this apparition - important since takes a while and having the cached is useful later
  5. Run a calibration on the image and add extra metadata (basically adjusts for stars etc etc so as to be able to prepare to run photometry)
  6. Run photometry on the image (calculates things like magnitude, errors etc etc... ).
  7. Insert all the stuff into the database tables image, photometry etc.
  8. Move scratch file that's been modified to the datalake (just a semantically structured file store really.).

Does that provide a good enough overview?

My key issue is we will need to ramp this up somewhat as we get more data sources coming online and ingsting. As well, things get really interesting when we get to large projects coming online which probably will require streaing and such...

Hope that helps. Airflow has been useful, but a very problem child so would love to replace it with something simpler and, as you say in your README, more dev friendly.

thanks!

I could not find the variable 't' definition in above code snippet

@wakatara
Copy link

wakatara commented Feb 7, 2025

@ghansham The above is the function that does the processing to give you an idea, the actual tasks.py (which is the t in the above) is a separate python file with the functions called for that. I can, of course, post these but it's quite a bit of code. Nothing complex or crazy but it's not of trivial length... =] The missing bit is import tasks as t in the code of the DAG at the top, but as I said, illustrative to give an idea of the actual processing pipeline that precedes the file fanout that scopps up all new files and then create multiple dags of sci_backend_processing with a different file (and then Airflow is processing about 128 of those at a time).

lemme know if drawing a diagram or such might help if that's not clearer...

@ghansham
Copy link

ghansham commented Feb 7, 2025

I can understand now.

@ghansham
Copy link

ghansham commented Feb 8, 2025

If you can identify files you are receiving with some uniqueIDs like (say date/time), start creating yaml files like this:

yaml filename : hubble1_20250104_050000.yaml

And create content of this yaml like this:

params: 
  - UNIQUE_ID: hubble1_20250104_050000
steps:
  - name: backend_science_processing
    command: python /home/sciwork/bin/sci_proc_driver.py $UNIQUE_ID

You can include more steps based on your processing pipeline.
And use the param UNIQUE_ID passed as command line argument for creating the filename to initiate the processing in the driver code and pass it into the python kwarg dict to be used in sci_backend_processing(**kwargs)

The one that you mentioning your code as

file = kwargs["file"]

Next write an ingestor program that runs as daemon that keeps watching the input directory area waiting for new files to arrive and create such yaml for every new file that arrives from a template yaml stored in a fixed location and keeps changing params in yaml based on the file that you want to process. And copy it to DAG_PATH. Let me know if you any queries about this ingestor program. This is something we wrote outside dagu. We used go in the pursuit of learning this language but it can written in any language like python, C++ or even using bash scripting.

And then you can create a post request in the the ingestor program to start processing for that yaml. For creating post requests that initiate dagu processing, refer to:

https://dagu.readthedocs.io/en/latest/rest.html#submit-dag-action-post-api-v1-dags-name

For example, using curl

curl -H 'Content-Type: application/json' -d '{"Action": "start"}' -X POST http://ip:port/api/v1/dags/dagname

depending on whether you are running it as http or https, you can change dagu url.
Thus you can ingest as many files as you want.
Once proof of concept is cleared, you can put this daemon as a systemd service.

Just one query how you are monitoring the input files for arrival.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature to be implemented
Projects
None yet
Development

No branches or pull requests

3 participants