Skip to content

Commit

Permalink
Backport proper counting of cpu-seconds for subprocesses
Browse files Browse the repository at this point in the history
  • Loading branch information
berland committed Feb 4, 2025
1 parent 6826527 commit 360f9be
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 15 deletions.
30 changes: 21 additions & 9 deletions src/_ert/forward_model_runner/forward_model_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import socket
import sys
import time
import uuid
from collections.abc import Generator, Sequence
from datetime import datetime as dt
from pathlib import Path
Expand Down Expand Up @@ -186,21 +187,32 @@ def _run(self) -> Generator[Start | Exited | Running | None]:
exit_code = None

max_memory_usage = 0
max_cpu_seconds = 0
fm_step_pids = {int(process.pid)}
cpu_seconds_pr_pid: dict[str, float] = {}
while exit_code is None:
(memory_rss, cpu_seconds, oom_score, pids) = _get_processtree_data(process)
max_cpu_seconds = max(max_cpu_seconds, cpu_seconds or 0)
(memory_rss, cpu_seconds_snapshot, oom_score, pids) = _get_processtree_data(
process
)
fm_step_pids |= pids
max_memory_usage = max(memory_rss, max_memory_usage)
for pid, seconds in cpu_seconds_snapshot.items():
if cpu_seconds_pr_pid.get(str(pid), 0.0) <= seconds:
cpu_seconds_pr_pid[str(pid)] = seconds
else:
# cpu_seconds must be monotonely increasing. Since
# decreasing cpu_seconds was detected, it must be due to pid reuse
cpu_seconds_pr_pid[str(pid) + str(uuid.uuid4())] = (
cpu_seconds_pr_pid[str(pid)]
)
cpu_seconds_pr_pid[str(pid)] = seconds
yield Running(
self,
ProcessTreeStatus(
rss=memory_rss,
max_rss=max_memory_usage,
fm_step_id=self.index,
fm_step_name=self.job_data.get("name"),
cpu_seconds=max_cpu_seconds,
cpu_seconds=sum(cpu_seconds_pr_pid.values()),
oom_score=oom_score,
),
)
Expand Down Expand Up @@ -416,7 +428,7 @@ def ensure_file_handles_closed(file_handles: Sequence[io.TextIOWrapper | None])

def _get_processtree_data(
process: Process,
) -> tuple[int, float, int | None, set[int]]:
) -> tuple[int, dict[str, float], int | None, set[int]]:
"""Obtain the oom_score (the Linux kernel uses this number to
decide which process to kill first in out-of-memory siturations).
Expand All @@ -435,7 +447,7 @@ def _get_processtree_data(
oom_score = None
# A value of None means that we have no information.
memory_rss = 0
cpu_seconds = 0.0
cpu_seconds_pr_pid: dict[str, float] = {}
pids = set()
with contextlib.suppress(ValueError, FileNotFoundError):
oom_score = int(
Expand All @@ -448,7 +460,7 @@ def _get_processtree_data(
process.oneshot(),
):
memory_rss = process.memory_info().rss
cpu_seconds = process.cpu_times().user
cpu_seconds_pr_pid[str(process.pid)] = process.cpu_times().user

with contextlib.suppress(
NoSuchProcess, AccessDenied, ZombieProcess, ProcessLookupError
Expand Down Expand Up @@ -476,5 +488,5 @@ def _get_processtree_data(
child.oneshot(),
):
memory_rss += child.memory_info().rss
cpu_seconds += child.cpu_times().user
return (memory_rss, cpu_seconds, oom_score, pids)
cpu_seconds_pr_pid[str(child.pid)] = child.cpu_times().user
return (memory_rss, cpu_seconds_pr_pid, oom_score, pids)
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ def test_run_with_process_failing(mock_process, mock_popen, mock_check_executabl
@pytest.mark.usefixtures("use_tmpdir")
def test_cpu_seconds_can_detect_multiprocess():
"""Run a fm step that sets of two simultaneous processes that
each run for 2 second. We should be able to detect the total
cpu seconds consumed to be roughly 2 seconds.
each run for 1 and 2 seconds respectively. We should be able to detect
the total cpu seconds consumed to be roughly 3 seconds.
The test is flaky in that it tries to gather cpu_seconds data while
the subprocesses are running. On a loaded CPU this is not very robust,
Expand All @@ -64,8 +64,9 @@ def test_cpu_seconds_can_detect_multiprocess():
textwrap.dedent(
"""\
import time
import sys
now = time.time()
while time.time() < now + 2:
while time.time() < now + int(sys.argv[1]):
pass"""
)
)
Expand All @@ -75,8 +76,8 @@ def test_cpu_seconds_can_detect_multiprocess():
textwrap.dedent(
"""\
#!/bin/sh
python busy.py &
python busy.py"""
python busy.py 1 &
python busy.py 2"""
)
)
executable = os.path.realpath(scriptname)
Expand Down Expand Up @@ -180,7 +181,7 @@ def oneshot(self):

def test_cpu_seconds_for_process_with_children():
(_, cpu_seconds, _, _) = _get_processtree_data(MockedProcess(123))
assert cpu_seconds == 123 / 10.0 + 124 / 10.0
assert cpu_seconds == {"123": 12.3, "124": 12.4}


@pytest.mark.skipif(sys.platform.startswith("darwin"), reason="No oom_score on MacOS")
Expand Down

0 comments on commit 360f9be

Please sign in to comment.