Skip to content

Commit 24dc0d5

Browse files
authored
slurm: support clusters without sacct (#1070)
1 parent ae72fa2 commit 24dc0d5

File tree

4 files changed

+185
-3
lines changed

4 files changed

+185
-3
lines changed

torchx/cli/cmd_list.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
HANDLE_HEADER = "APP HANDLE"
2323
STATUS_HEADER = "APP STATUS"
24+
NAME_HEADER = "APP NAME"
2425

2526

2627
class CmdList(SubCommand):
@@ -39,5 +40,7 @@ def add_arguments(self, subparser: argparse.ArgumentParser) -> None:
3940
def run(self, args: argparse.Namespace) -> None:
4041
with get_runner() as runner:
4142
apps = runner.list(args.scheduler)
42-
apps_data = [[app.app_handle, str(app.state)] for app in apps]
43-
print(tabulate(apps_data, headers=[HANDLE_HEADER, STATUS_HEADER]))
43+
apps_data = [[app.app_handle, app.name, str(app.state)] for app in apps]
44+
print(
45+
tabulate(apps_data, headers=[HANDLE_HEADER, NAME_HEADER, STATUS_HEADER])
46+
)

torchx/schedulers/api.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ class ListAppResponse:
8686
app_id: str
8787
state: AppState
8888
app_handle: str = "<NOT_SET>"
89+
name: str = ""
8990

9091
# Implementing __hash__() makes ListAppResponse hashable which makes
9192
# it easier to check if a ListAppResponse object exists in a list of

torchx/schedulers/slurm_scheduler.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,12 @@ def _cancel_existing(self, app_id: str) -> None:
482482
subprocess.run(["scancel", app_id], check=True)
483483

484484
def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
485+
try:
486+
return self._describe_sacct(app_id)
487+
except subprocess.CalledProcessError:
488+
return self._describe_squeue(app_id)
489+
490+
def _describe_sacct(self, app_id: str) -> Optional[DescribeAppResponse]:
485491
p = subprocess.run(
486492
["sacct", "--parsable2", "-j", app_id], stdout=subprocess.PIPE, check=True
487493
)
@@ -534,6 +540,48 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
534540
msg=msg,
535541
)
536542

543+
def _describe_squeue(self, app_id: str) -> Optional[DescribeAppResponse]:
544+
p = subprocess.run(
545+
["squeue", "--json", "-j", app_id], stdout=subprocess.PIPE, check=True
546+
)
547+
output_json = json.loads(p.stdout.decode("utf-8"))
548+
549+
roles = {}
550+
roles_statuses = {}
551+
msg = ""
552+
app_state = AppState.UNKNOWN
553+
for job in output_json["jobs"]:
554+
state = job["job_state"][0]
555+
msg = state
556+
state_enum = SLURM_STATES.get(state)
557+
assert (
558+
state_enum
559+
), f"failed to translate slurm state {state} to torchx state"
560+
app_state = state_enum
561+
562+
role, _, replica_id = job["name"].rpartition("-")
563+
if not replica_id or not role:
564+
# name should always have at least 3 parts but sometimes sacct
565+
# is slow to update
566+
continue
567+
if role not in roles:
568+
roles[role] = Role(name=role, num_replicas=0, image="")
569+
roles_statuses[role] = RoleStatus(role, [])
570+
roles[role].num_replicas += 1
571+
roles_statuses[role].replicas.append(
572+
ReplicaStatus(
573+
id=int(replica_id), role=role, state=app_state, hostname=""
574+
),
575+
)
576+
577+
return DescribeAppResponse(
578+
app_id=app_id,
579+
roles=list(roles.values()),
580+
roles_statuses=list(roles_statuses.values()),
581+
state=app_state,
582+
msg=msg,
583+
)
584+
537585
def log_iter(
538586
self,
539587
app_id: str,
@@ -574,6 +622,12 @@ def log_iter(
574622
return iterator
575623

576624
def list(self) -> List[ListAppResponse]:
625+
try:
626+
return self._list_sacct()
627+
except subprocess.CalledProcessError:
628+
return self._list_squeue()
629+
630+
def _list_sacct(self) -> List[ListAppResponse]:
577631
# By default sacct only returns accounting information of jobs launched on the current day
578632
# To return all jobs launched, set starttime to one second past unix epoch time
579633
# Starttime will be modified when listing jobs by timeframe is supported
@@ -590,6 +644,38 @@ def list(self) -> List[ListAppResponse]:
590644
for job in output_json["jobs"]
591645
]
592646

647+
def _list_squeue(self) -> List[ListAppResponse]:
648+
# if sacct isn't configured on the cluster, fallback to squeue which
649+
# only has currently running jobs
650+
p = subprocess.run(
651+
["squeue", "--json"],
652+
stdout=subprocess.PIPE,
653+
check=True,
654+
)
655+
output_json = json.loads(p.stdout.decode("utf-8"))
656+
657+
out = []
658+
for job in output_json["jobs"]:
659+
job_id = job["job_id"]
660+
661+
het_job_id = job.get("het_job_id")
662+
if (
663+
het_job_id
664+
and het_job_id["set"]
665+
and het_job_id["number"] != job_id
666+
and het_job_id["number"] > 0
667+
):
668+
continue
669+
670+
out.append(
671+
ListAppResponse(
672+
app_id=str(job["job_id"]),
673+
state=SLURM_STATES[job["job_state"][0]],
674+
name=job["name"],
675+
)
676+
)
677+
return out
678+
593679

594680
def create_scheduler(session_name: str, **kwargs: Any) -> SlurmScheduler:
595681
return SlurmScheduler(

torchx/schedulers/test/slurm_scheduler_test.py

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,48 @@ def test_describe_running(self, run: MagicMock) -> None:
399399
self.assertEqual(out.state, specs.AppState.RUNNING)
400400

401401
@patch("subprocess.run")
402-
def test_list(self, run: MagicMock) -> None:
402+
def test_describe_squeue(self, run: MagicMock) -> None:
403+
run.return_value.stdout = b"""{
404+
"jobs": [
405+
{
406+
"job_id": 1236,
407+
"name": "foo-0",
408+
"job_state": ["RUNNING"],
409+
"het_job_id": {
410+
"set": true,
411+
"infinite": false,
412+
"number": 1236
413+
}
414+
},
415+
{
416+
"job_id": 1237,
417+
"name": "foo-1",
418+
"job_state": ["RUNNING"],
419+
"het_job_id": {
420+
"set": true,
421+
"infinite": false,
422+
"number": 1236
423+
}
424+
}
425+
]
426+
}"""
427+
428+
scheduler = create_scheduler("foo")
429+
out = scheduler._describe_squeue("54")
430+
431+
self.assertEqual(run.call_count, 1)
432+
self.assertEqual(
433+
run.call_args,
434+
call(["squeue", "--json", "-j", "54"], stdout=subprocess.PIPE, check=True),
435+
)
436+
437+
self.assertIsNotNone(out)
438+
self.assertEqual(out.app_id, "54")
439+
self.assertEqual(out.msg, "RUNNING")
440+
self.assertEqual(out.state, specs.AppState.RUNNING)
441+
442+
@patch("subprocess.run")
443+
def test_list_sacct(self, run: MagicMock) -> None:
403444
run.return_value.stdout = b"""{\n "meta": {\n },\n "errors": [\n ],\n "jobs": [
404445
\n {\n "account": null,\n "job_id": 123,\n "name": "main-0",
405446
\n "state": {\n "current": "COMPLETED",\n "reason": "None"},
@@ -416,6 +457,57 @@ def test_list(self, run: MagicMock) -> None:
416457
self.assertIsNotNone(apps)
417458
self.assertEqual(apps, expected_apps)
418459

460+
@patch("subprocess.run")
461+
def test_list_squeue(self, run: MagicMock) -> None:
462+
run.return_value.stdout = b"""{
463+
"jobs": [
464+
{
465+
"job_id": 1234,
466+
"name": "foo",
467+
"job_state": ["FAILED"]
468+
},
469+
{
470+
"job_id": 1235,
471+
"name": "foo",
472+
"job_state": ["FAILED"],
473+
"het_job_id": {
474+
"set": true,
475+
"infinite": false,
476+
"number": 0
477+
}
478+
},
479+
{
480+
"job_id": 1236,
481+
"name": "foo-0",
482+
"job_state": ["RUNNING"],
483+
"het_job_id": {
484+
"set": true,
485+
"infinite": false,
486+
"number": 1236
487+
}
488+
},
489+
{
490+
"job_id": 1237,
491+
"name": "foo-1",
492+
"job_state": ["RUNNING"],
493+
"het_job_id": {
494+
"set": true,
495+
"infinite": false,
496+
"number": 1236
497+
}
498+
}
499+
]
500+
}"""
501+
scheduler = create_scheduler("foo")
502+
expected_apps = [
503+
ListAppResponse(app_id="1234", state=AppState.FAILED, name="foo"),
504+
ListAppResponse(app_id="1235", state=AppState.FAILED, name="foo"),
505+
ListAppResponse(app_id="1236", state=AppState.RUNNING, name="foo-0"),
506+
]
507+
apps = scheduler._list_squeue()
508+
self.assertIsNotNone(apps)
509+
self.assertEqual(apps, expected_apps)
510+
419511
@patch("subprocess.run")
420512
def test_log_iter(self, run: MagicMock) -> None:
421513
scheduler = create_scheduler("foo")

0 commit comments

Comments
 (0)