-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathslurm_run.py
192 lines (179 loc) · 7.19 KB
/
slurm_run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable
from adaptive_scheduler.scheduler import SLURM, slurm_partitions
from adaptive_scheduler.utils import _get_default_args
from .common import console
from .run_manager import RunManager
if TYPE_CHECKING:
import adaptive
from adaptive.learner.base_learner import LearnerType
from adaptive_scheduler.utils import _DATAFRAME_FORMATS, EXECUTOR_TYPES, GoalTypes
def slurm_run(
learners: list[adaptive.BaseLearner],
fnames: list[str] | list[Path],
*,
partition: str | tuple[str, ...] | None = None,
nodes: int | tuple[int, ...] = 1,
cores_per_node: int | tuple[int, ...] | None = None,
goal: GoalTypes | None = None,
folder: str | Path = "",
name: str = "adaptive",
num_threads: int = 1,
save_interval: float = 300,
log_interval: float = 300,
cleanup_first: bool = True,
save_dataframe: bool = True,
dataframe_format: _DATAFRAME_FORMATS = "pickle",
periodic_callable: tuple[Callable[[str, LearnerType], None], float] | None = None,
max_fails_per_job: int = 50,
max_simultaneous_jobs: int = 100,
exclusive: bool | tuple[bool, ...] = True,
executor_type: EXECUTOR_TYPES | tuple[EXECUTOR_TYPES, ...] = "process-pool",
extra_scheduler: list[str] | tuple[list[str], ...] | None = None,
extra_run_manager_kwargs: dict[str, Any] | None = None,
extra_scheduler_kwargs: dict[str, Any] | None = None,
initializers: list[Callable[[], None]] | None = None,
) -> RunManager:
"""Run adaptive on a SLURM cluster.
``cores``, ``nodes``, ``cores_per_node``, ``extra_scheduler``, ``executor_type``,
``extra_script``, ``exclusive``, ``extra_env_vars`` and ``partition`` can be
either a single value or a tuple of values. If a tuple is given, then the
length of the tuple should be the same as the number of learners (jobs) that
are run. This allows for different resources for different jobs.
Parameters
----------
learners
A list of learners.
fnames
A list of filenames to save the learners.
partition
The partition to use. If None, then the default partition will be used.
(The one marked with a * in `sinfo`). Use
`adaptive_scheduler.scheduler.slurm_partitions` to see the
available partitions.
nodes
The number of nodes to use.
cores_per_node
The number of cores per node to use. If None, then all cores on the partition
will be used.
goal
The goal of the adaptive run. If None, then the run will continue
indefinitely.
folder
The folder to save the adaptive_scheduler files such as logs, database,
and ``.sbatch`` files in.
name
The name of the job.
num_threads
The number of threads to use.
save_interval
The interval at which to save the learners.
log_interval
The interval at which to log the status of the run.
cleanup_first
Whether to clean up the folder before starting the run.
save_dataframe
Whether to save the `pandas.DataFrame`s with the learners data.
dataframe_format
The format to save the `pandas.DataFrame`s in. See
`adaptive_scheduler.utils.save_dataframes` for more information.
periodic_callable
A tuple of a callable and an interval in seconds. The callable will be called
every `interval` seconds and takes the learner name and learner as arguments.
max_fails_per_job
The maximum number of times a job can fail before it is cancelled.
max_simultaneous_jobs
The maximum number of simultaneous jobs.
executor_type
The executor that is used, by default `concurrent.futures.ProcessPoolExecutor` is used.
One can use ``"ipyparallel"``, ``"dask-mpi"``, ``"mpi4py"``,
``"loky"``, ``"sequential"``, or ``"process-pool"``.
exclusive
Whether to use exclusive nodes, adds ``"--exclusive"`` if True.
extra_scheduler
Extra ``#SLURM`` (depending on scheduler type)
arguments, e.g. ``["--exclusive=user", "--time=1"]`` or a tuple of lists,
e.g. ``(["--time=10"], ["--time=20"]])`` for two jobs.
extra_run_manager_kwargs
Extra keyword arguments to pass to the `RunManager`.
extra_scheduler_kwargs
Extra keyword arguments to pass to the `adaptive_scheduler.scheduler.SLURM`.
initializers
List of functions that are called before the job starts, can populate
a cache.
Returns
-------
RunManager
"""
if partition is None:
partitions = slurm_partitions()
assert isinstance(partitions, dict)
partition, ncores = next(iter(partitions.items()))
console.log(
f"Using default partition {partition} (The one marked"
f" with a '*' in `sinfo`) with {ncores} cores."
" Use `adaptive_scheduler.scheduler.slurm_partitions`"
" to see the available partitions.",
)
if executor_type == "process-pool" and (
nodes > 1 if isinstance(nodes, int) else any(n > 1 for n in nodes)
):
msg = (
"process-pool can maximally use a single node,"
" use e.g., ipyparallel for multi node.",
)
raise ValueError(msg)
folder = Path(folder)
folder.mkdir(parents=True, exist_ok=True)
if cores_per_node is None:
partitions = slurm_partitions()
assert isinstance(partitions, dict)
cores_per_node = (
tuple(partitions[p] for p in partition) # type: ignore[misc]
if isinstance(partition, tuple)
else partitions[partition]
)
if extra_scheduler_kwargs is None:
extra_scheduler_kwargs = {}
if extra_scheduler is not None:
# "extra_scheduler" used to be passed via the extra_scheduler_kwargs
# this ensures backwards compatibility
assert "extra_scheduler" not in extra_scheduler_kwargs
extra_scheduler_kwargs["extra_scheduler"] = extra_scheduler
slurm_kwargs = dict(
_get_default_args(SLURM),
nodes=nodes,
cores_per_node=cores_per_node,
partition=partition,
log_folder=folder / "logs",
batch_folder=folder / "batch_scripts",
executor_type=executor_type,
num_threads=num_threads,
exclusive=exclusive,
**extra_scheduler_kwargs,
)
scheduler = SLURM(**slurm_kwargs)
# Below are the defaults for the RunManager
kw = dict(
_get_default_args(RunManager),
scheduler=scheduler,
learners=learners,
fnames=fnames,
goal=goal,
save_interval=save_interval,
log_interval=log_interval,
move_old_logs_to=folder / "old_logs",
db_fname=folder / f"{name}.db.json",
job_name=name,
cleanup_first=cleanup_first,
save_dataframe=save_dataframe,
dataframe_format=dataframe_format,
periodic_callable=periodic_callable,
max_fails_per_job=max_fails_per_job,
max_simultaneous_jobs=max_simultaneous_jobs,
initializers=initializers,
)
if extra_run_manager_kwargs is None:
extra_run_manager_kwargs = {}
return RunManager(**dict(kw, **extra_run_manager_kwargs))