Skip to content

Add support for OAR Scheduler #713

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pydra/engine/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
not (bool(shutil.which("qsub")) and bool(shutil.which("qacct"))),
reason="sge not available",
)
need_oar = pytest.mark.skipif(
not (bool(shutil.which("oarsub")) and bool(shutil.which("oarstat"))),
reason="oar not available",
)


def num_python_cache_roots(cache_path: Path) -> int:
Expand Down
175 changes: 175 additions & 0 deletions pydra/workers/oar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import asyncio
import os
import sys
import json
import re
import typing as ty
from tempfile import gettempdir
from pathlib import Path
from shutil import copyfile
import logging
import attrs
from pydra.engine.job import Job, save
from pydra.workers import base


logger = logging.getLogger("pydra.worker")

if ty.TYPE_CHECKING:
from pydra.engine.result import Result

Check warning on line 19 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L19

Added line #L19 was not covered by tests


@attrs.define
class OarWorker(base.Worker):
"""A worker to execute tasks on OAR systems."""

_cmd = "oarsub"

poll_delay: int = attrs.field(default=1, converter=base.ensure_non_negative)
oarsub_args: str = ""
error: dict[str, ty.Any] = attrs.field(factory=dict)

def __getstate__(self) -> dict[str, ty.Any]:
"""Return state for pickling."""
state = super().__getstate__()
del state["error"]
return state

Check warning on line 36 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L34-L36

Added lines #L34 - L36 were not covered by tests

def __setstate__(self, state: dict[str, ty.Any]):
"""Set state for unpickling."""
state["error"] = {}
super().__setstate__(state)

Check warning on line 41 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L40-L41

Added lines #L40 - L41 were not covered by tests

def _prepare_runscripts(self, job, interpreter="/bin/sh", rerun=False):
if isinstance(job, Job):
cache_root = job.cache_root
ind = None
uid = job.uid

Check warning on line 47 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L45-L47

Added lines #L45 - L47 were not covered by tests
else:
assert isinstance(job, tuple), f"Expecting a job or a tuple, not {job!r}"
assert len(job) == 2, f"Expecting a tuple of length 2, not {job!r}"
ind = job[0]
cache_root = job[-1].cache_root
uid = f"{job[-1].uid}_{ind}"

Check warning on line 53 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L49-L53

Added lines #L49 - L53 were not covered by tests

script_dir = cache_root / f"{self.plugin_name()}_scripts" / uid
script_dir.mkdir(parents=True, exist_ok=True)

Check warning on line 56 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L55-L56

Added lines #L55 - L56 were not covered by tests
if ind is None:
if not (script_dir / "_job.pklz").exists():
save(script_dir, job=job)

Check warning on line 59 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L59

Added line #L59 was not covered by tests
else:
copyfile(job[1], script_dir / "_job.pklz")

Check warning on line 61 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L61

Added line #L61 was not covered by tests

job_pkl = script_dir / "_job.pklz"

Check warning on line 63 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L63

Added line #L63 was not covered by tests
if not job_pkl.exists() or not job_pkl.stat().st_size:
raise Exception("Missing or empty job!")

Check warning on line 65 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L65

Added line #L65 was not covered by tests

batchscript = script_dir / f"batchscript_{uid}.sh"
python_string = (

Check warning on line 68 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L67-L68

Added lines #L67 - L68 were not covered by tests
f"""'from pydra.engine.job import load_and_run; """
f"""load_and_run("{job_pkl}", rerun={rerun}) '"""
)
bcmd = "\n".join(

Check warning on line 72 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L72

Added line #L72 was not covered by tests
(
f"#!{interpreter}",
f"{sys.executable} -c " + python_string,
)
)
with batchscript.open("wt") as fp:
fp.writelines(bcmd)
os.chmod(batchscript, 0o544)
return script_dir, batchscript

Check warning on line 81 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L79-L81

Added lines #L79 - L81 were not covered by tests

async def run(self, job: "Job[base.TaskType]", rerun: bool = False) -> "Result":
"""Worker submission API."""
script_dir, batch_script = self._prepare_runscripts(job, rerun=rerun)

Check warning on line 85 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L85

Added line #L85 was not covered by tests
if (script_dir / script_dir.parts[1]) == gettempdir():
logger.warning("Temporary directories may not be shared across computers")
script_dir = job.cache_root / f"{self.plugin_name()}_scripts" / job.uid
sargs = self.oarsub_args.split()
jobname = re.search(r"(?<=-n )\S+|(?<=--name=)\S+", self.oarsub_args)

Check warning on line 90 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L87-L90

Added lines #L87 - L90 were not covered by tests
if not jobname:
jobname = ".".join((job.name, job.uid))
sargs.append(f"--name={jobname}")
output = re.search(r"(?<=-O )\S+|(?<=--stdout=)\S+", self.oarsub_args)

Check warning on line 94 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L92-L94

Added lines #L92 - L94 were not covered by tests
if not output:
output_file = str(script_dir / "oar-%jobid%.out")
sargs.append(f"--stdout={output_file}")
error = re.search(r"(?<=-e )\S+|(?<=--error=)\S+", self.oarsub_args)

Check warning on line 98 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L96-L98

Added lines #L96 - L98 were not covered by tests
if not error:
error_file = str(script_dir / "oar-%jobid%.err")
sargs.append(f"--stderr={error_file}")

Check warning on line 101 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L100-L101

Added lines #L100 - L101 were not covered by tests
else:
error_file = None
sargs.append(str(batch_script))

Check warning on line 104 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L103-L104

Added lines #L103 - L104 were not covered by tests
# TO CONSIDER: add random sleep to avoid overloading calls
rc, stdout, stderr = await base.read_and_display_async(

Check warning on line 106 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L106

Added line #L106 was not covered by tests
self._cmd, *sargs, hide_display=True
)
jobid = re.search(r"OAR_JOB_ID=(\d+)", stdout)

Check warning on line 109 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L109

Added line #L109 was not covered by tests
if rc:
raise RuntimeError(f"Error returned from oarsub: {stderr}")

Check warning on line 111 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L111

Added line #L111 was not covered by tests
elif not jobid:
raise RuntimeError("Could not extract job ID")
jobid = jobid.group(1)

Check warning on line 114 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L113-L114

Added lines #L113 - L114 were not covered by tests
if error_file:
error_file = error_file.replace("%jobid%", jobid)
self.error[jobid] = error_file.replace("%jobid%", jobid)

Check warning on line 117 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L116-L117

Added lines #L116 - L117 were not covered by tests
# intermittent polling
while True:

Check warning on line 119 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L119

Added line #L119 was not covered by tests
# 4 possibilities
# False: job is still pending/working
# Terminated: job is complete
# Error + idempotent: job has been stopped and resubmited with another jobid
# Error: Job failure
done = await self._poll_job(jobid)

Check warning on line 125 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L125

Added line #L125 was not covered by tests
if not done:
await asyncio.sleep(self.poll_delay)

Check warning on line 127 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L127

Added line #L127 was not covered by tests
elif done == "Terminated":
return True

Check warning on line 129 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L129

Added line #L129 was not covered by tests
elif done == "Error" and "idempotent" in self.oarsub_args:
logger.debug(

Check warning on line 131 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L131

Added line #L131 was not covered by tests
f"Job {jobid} has been stopped. Looking for its resubmission..."
)
# loading info about task with a specific uid
info_file = job.cache_root / f"{job.uid}_info.json"

Check warning on line 135 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L135

Added line #L135 was not covered by tests
if info_file.exists():
checksum = json.loads(info_file.read_text())["checksum"]

Check warning on line 137 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L137

Added line #L137 was not covered by tests
if (job.cache_root / f"{checksum}.lock").exists():
# for pyt3.8 we could you missing_ok=True
(job.cache_root / f"{checksum}.lock").unlink()
cmd_re = ("oarstat", "-J", "--sql", f"resubmit_job_id='{jobid}'")
_, stdout, _ = await base.read_and_display_async(

Check warning on line 142 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L140-L142

Added lines #L140 - L142 were not covered by tests
*cmd_re, hide_display=True
)
if not stdout:
raise RuntimeError(

Check warning on line 146 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L146

Added line #L146 was not covered by tests
"Job information about resubmission of job {jobid} not found"
)
jobid = next(iter(json.loads(stdout).keys()), None)

Check warning on line 149 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L149

Added line #L149 was not covered by tests
else:
error_file = self.error[jobid]
error_line = Path(error_file).read_text().split("\n")[-2]

Check warning on line 152 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L151-L152

Added lines #L151 - L152 were not covered by tests
if "Exception" in error_line:
error_message = error_line.replace("Exception: ", "")

Check warning on line 154 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L154

Added line #L154 was not covered by tests
elif "Error" in error_line:
error_message = error_line.replace("Error: ", "")

Check warning on line 156 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L156

Added line #L156 was not covered by tests
else:
error_message = "Job failed (unknown reason - TODO)"
raise Exception(error_message)

Check warning on line 159 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L158-L159

Added lines #L158 - L159 were not covered by tests
return True

async def _poll_job(self, jobid):
cmd = ("oarstat", "-J", "-s", "-j", jobid)
logger.debug(f"Polling job {jobid}")
_, stdout, _ = await base.read_and_display_async(*cmd, hide_display=True)

Check warning on line 165 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L163-L165

Added lines #L163 - L165 were not covered by tests
if not stdout:
raise RuntimeError("Job information not found")
status = json.loads(stdout)[jobid]

Check warning on line 168 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L167-L168

Added lines #L167 - L168 were not covered by tests
if status in ["Waiting", "Launching", "Running", "Finishing"]:
return False
return status

Check warning on line 171 in pydra/workers/oar.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/oar.py#L170-L171

Added lines #L170 - L171 were not covered by tests


# Alias so it can be referred to as oar.Worker
Worker = OarWorker
76 changes: 76 additions & 0 deletions pydra/workers/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
need_sge,
need_slurm,
need_singularity,
need_oar,
BasicWorkflow,
BasicWorkflowWithThreadCount,
BasicWorkflowWithThreadCountConcurrent,
Expand Down Expand Up @@ -602,6 +603,81 @@
assert job_1_endtime > job_2_starttime


@need_oar
def test_oar_wf(tmpdir):
wf = BasicWorkflow(x=1)
wf.cache_dir = tmpdir

Check warning on line 609 in pydra/workers/tests/test_worker.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/tests/test_worker.py#L608-L609

Added lines #L608 - L609 were not covered by tests
# submit workflow and every task as oar job
with Submitter(worker="oar", cache_root=tmpdir) as sub:
res = sub(wf)

Check warning on line 612 in pydra/workers/tests/test_worker.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/tests/test_worker.py#L612

Added line #L612 was not covered by tests

outputs = res.outputs
assert outputs.out == 9
script_dir = tmpdir / "oar_scripts"
assert script_dir.exists()

Check warning on line 617 in pydra/workers/tests/test_worker.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/tests/test_worker.py#L614-L617

Added lines #L614 - L617 were not covered by tests
# ensure each task was executed with oar
assert len([sd for sd in script_dir.listdir() if sd.isdir()]) == 2


@need_oar
def test_oar_wf_cf(tmpdir):
# submit entire workflow as single job executing with cf worker
wf = BasicWorkflow(x=1)
wf.plugin = "cf"

Check warning on line 626 in pydra/workers/tests/test_worker.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/tests/test_worker.py#L625-L626

Added lines #L625 - L626 were not covered by tests
with Submitter(worker="oar", cache_root=tmpdir) as sub:
res = sub(wf)

Check warning on line 628 in pydra/workers/tests/test_worker.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/tests/test_worker.py#L628

Added line #L628 was not covered by tests

outputs = res.outputs
assert outputs.out == 9
script_dir = tmpdir / "oar_scripts"
assert script_dir.exists()

Check warning on line 633 in pydra/workers/tests/test_worker.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/tests/test_worker.py#L630-L633

Added lines #L630 - L633 were not covered by tests
# ensure only workflow was executed with oar
sdirs = [sd for sd in script_dir.listdir() if sd.isdir()]
assert len(sdirs) == 1

Check warning on line 636 in pydra/workers/tests/test_worker.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/tests/test_worker.py#L636

Added line #L636 was not covered by tests
# oar scripts should be in the dirs that are using uid in the name
assert sdirs[0].basename == wf.uid

Check warning on line 638 in pydra/workers/tests/test_worker.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/tests/test_worker.py#L638

Added line #L638 was not covered by tests


@need_oar
def test_oar_wf_state(tmpdir):
wf = BasicWorkflow().split(x=[5, 6])

Check warning on line 643 in pydra/workers/tests/test_worker.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/tests/test_worker.py#L643

Added line #L643 was not covered by tests
with Submitter(worker="oar", cache_root=tmpdir) as sub:
res = sub(wf)

Check warning on line 645 in pydra/workers/tests/test_worker.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/tests/test_worker.py#L645

Added line #L645 was not covered by tests

assert res.outputs.out == [9, 10]
script_dir = tmpdir / "OarWorker_scripts"
assert script_dir.exists()

Check warning on line 649 in pydra/workers/tests/test_worker.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/tests/test_worker.py#L647-L649

Added lines #L647 - L649 were not covered by tests
sdirs = [sd for sd in script_dir.listdir() if sd.isdir()]
assert len(sdirs) == 2 * len(wf.x)

Check warning on line 651 in pydra/workers/tests/test_worker.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/tests/test_worker.py#L651

Added line #L651 was not covered by tests


@need_oar
def test_oar_args_1(tmpdir):
"""testing sbatch_args provided to the submitter"""
task = SleepAddOne(x=1)

Check warning on line 657 in pydra/workers/tests/test_worker.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/tests/test_worker.py#L657

Added line #L657 was not covered by tests
# submit workflow and every task as oar job
with Submitter(worker="oar", cache_root=tmpdir, oarsub_args="-l nodes=2") as sub:
res = sub(task)

Check warning on line 660 in pydra/workers/tests/test_worker.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/tests/test_worker.py#L660

Added line #L660 was not covered by tests

assert res.outputs.out == 2
script_dir = tmpdir / "oar_scripts"
assert script_dir.exists()

Check warning on line 664 in pydra/workers/tests/test_worker.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/tests/test_worker.py#L662-L664

Added lines #L662 - L664 were not covered by tests


@need_oar
def test_oar_args_2(tmpdir):
"""testing oarsub_args provided to the submitter
exception should be raised for invalid options
"""
task = SleepAddOne(x=1)

Check warning on line 672 in pydra/workers/tests/test_worker.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/tests/test_worker.py#L672

Added line #L672 was not covered by tests
# submit workflow and every task as oar job
with pytest.raises(RuntimeError, match="Error returned from oarsub:"):
with Submitter(
worker="oar", cache_root=tmpdir, oarsub_args="-l nodes=2 --invalid"
) as sub:
sub(task)

Check warning on line 678 in pydra/workers/tests/test_worker.py

View check run for this annotation

Codecov / codecov/patch

pydra/workers/tests/test_worker.py#L678

Added line #L678 was not covered by tests


def test_hash_changes_in_task_inputs_file(tmp_path):
@python.define
def cache_dir_as_input(out_dir: Directory) -> Directory:
Expand Down
Loading