Skip to content

Commit

Permalink
tweak execution setup to accomodate multiple levels of theory
Browse files Browse the repository at this point in the history
  • Loading branch information
svandenhaute committed May 28, 2024
1 parent 9dde204 commit dc1b0e6
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 118 deletions.
26 changes: 8 additions & 18 deletions configs/hortense.yaml
Original file line number Diff line number Diff line change
@@ -1,47 +1,37 @@
---
container:
engine: "apptainer"
uri: "oras://ghcr.io/molmod/psiflow:develop_python3.10_cuda"
ModelEvaluation:
cores_per_worker: 12
gpu: True
max_simulation_time: 20
SlurmProvider:
slurm:
partition: "gpu_rome_a100"
account: "2023_070"
nodes_per_block: 1 # each block fits on (less than) one node
cores_per_node: 48 # number of cores per slurm job
init_blocks: 0 # initialize a block at the start of the workflow
max_blocks: 1 # do not use more than one block
walltime: "12:00:00" # walltime per block
exclusive: false # rest of compute node free to use
nodes_per_block: 1
cores_per_node: 48
max_blocks: 1
walltime: "12:00:00"
scheduler_options: "#SBATCH --clusters=dodrio\n#SBATCH --gpus=4\n"
ModelTraining:
cores_per_worker: 12
gpu: true
max_training_time: 40
SlurmProvider:
slurm:
partition: "gpu_rome_a100"
account: "2023_070"
nodes_per_block: 1
cores_per_node: 12
init_blocks: 0
max_blocks: 1
walltime: "12:00:00"
exclusive: false
scheduler_options: "#SBATCH --clusters=dodrio\n#SBATCH --gpus=1\n"
ReferenceEvaluation:
CP2K:
cores_per_worker: 64
max_evaluation_time: 30
SlurmProvider:
slurm:
partition: "cpu_rome"
account: "starting_2024_030"
nodes_per_block: 1
cores_per_node: 64
init_blocks: 0
min_blocks: 0
max_blocks: 50
walltime: "06:00:00"
exclusive: false
scheduler_options: "#SBATCH --clusters=dodrio\n"
...
18 changes: 16 additions & 2 deletions configs/threadpool.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,23 @@ ModelTraining:
gpu: true
use_threadpool: true
max_training_time: 1
ReferenceEvaluation:
CP2K:
cores_per_worker: 2
max_evaluation_time: 0.3
mpi_command: 'mpirun -np 2 -x OMP_NUM_THREADS=1' # cp2k on conda-forge comes with OpenMPI (not MPICH as in container)
launch_command: 'mpirun -np 2 -x OMP_NUM_THREADS=1 cp2k.psmp -i cp2k.inp'
use_threadpool: true
CP2K_container:
cores_per_worker: 2
max_evaluation_time: 0.3
launch_command: 'apptainer exec -e --no-init oras://ghcr.io/molmod/cp2k:2023.2 /usr/local/bin/entrypoint.sh mpirun -np 2 -x OMP_NUM_THREADS=1 cp2k.psmp -i cp2k.inp'
use_threadpool: true
GPAW:
cores_per_worker: 2
max_evaluation_time: 0.3
use_threadpool: true
GPAW_container:
cores_per_worker: 2
max_evaluation_time: 0.3
launch_command: 'apptainer exec -e --no-init oras://ghcr.io/molmod/gpaw:24.1 /opt/entry.sh mpirun -np 2 gpaw python /opt/run_gpaw.py input.json'
use_threadpool: true
...
174 changes: 89 additions & 85 deletions psiflow/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,16 @@ class ExecutionDefinition:
def __init__(
self,
parsl_provider: ExecutionProvider,
gpu: bool = False,
cores_per_worker: int = 1,
use_threadpool: bool = False,
cpu_affinity: str = "block",
gpu: bool,
cores_per_worker: int,
use_threadpool: bool,
worker_prepend: str,
) -> None:
self.parsl_provider = parsl_provider
self.gpu = gpu
self.cores_per_worker = cores_per_worker
self.use_threadpool = use_threadpool
self.cpu_affinity = cpu_affinity
self.worker_prepend = worker_prepend
self.name = self.__class__.__name__

@property
Expand All @@ -71,9 +71,7 @@ def max_runtime(self):
walltime = 1e9
return walltime

def create_executor(
self, path: Path, htex_address: Optional[str] = None, **kwargs
) -> ParslExecutor:
def create_executor(self, path: Path, **kwargs) -> ParslExecutor:
if self.use_threadpool:
executor = ThreadPoolExecutor(
max_threads=self.cores_per_worker,
Expand All @@ -91,13 +89,6 @@ def create_executor(
if self.gpu:
worker_options.append("--gpus={}".format(self.max_workers))

# hacky; if the launcher is a WrappedLauncher, switch to SimpleLauncher
# and prepend the command to worker_executable
if isinstance(self.parsl_provider.launcher, WrappedLauncher):
prepend = self.parsl_provider.launcher.prepend
self.parsl_provider.launcher = SimpleLauncher()
else:
prepend = ""
executor = WorkQueueExecutor(
label=self.name,
working_dir=str(path / self.name),
Expand All @@ -108,44 +99,53 @@ def create_executor(
max_retries=0,
coprocess=False,
worker_options=" ".join(worker_options),
worker_executable="{} work_queue_worker".format(prepend),
worker_executable="{} work_queue_worker".format(self.worker_prepend),
scaling_assume_core_slots_per_worker=cores,
)
return executor

@classmethod
def from_config(
cls,
config_dict: dict,
gpu: bool = False,
cores_per_worker: int = 1,
use_threadpool: bool = False,
container: Optional[dict] = None,
**kwargs,
):
if "container" in config_dict:
container = config_dict.pop("container") # only used once

# search for any section in the config which defines the Parsl ExecutionProvider
# if none are found, default to LocalProvider
provider_keys = list(filter(lambda k: "Provider" in k, config_dict.keys()))
if len(provider_keys) == 0:
# currently only checking for SLURM
if "slurm" in kwargs:
provider_cls = SlurmProvider
provider_kwargs = kwargs.get("slurm") # do not allow empty dict
else:
provider_cls = LocalProvider # noqa: F405
provider_dict = {}
elif len(provider_keys) == 1:
provider_cls = getattr(sys.modules[__name__], provider_keys[0])
provider_dict = config_dict.pop(provider_keys[0])
provider_kwargs = kwargs.get("local", {})

# if multi-node blocks are requested, make sure we're using SlurmProvider
if provider_dict.get("nodes_per_block", 1) > 1:
if provider_kwargs.get("nodes_per_block", 1) > 1:
raise NotImplementedError

if container is not None:
assert not use_threadpool
worker_prepend = container_launch_command(gpu=gpu, **container)
else:
if container is not None:
gpu = config_dict.get("gpu", False)
launch_command = container_launch_command(gpu=gpu, **container)
launcher = WrappedLauncher(prepend=launch_command)
else:
launcher = SimpleLauncher()
worker_prepend = ""

# initialize provider
parsl_provider = provider_cls(launcher=launcher, **provider_dict)
return cls(parsl_provider=parsl_provider, **config_dict)
parsl_provider = provider_cls(
launcher=SimpleLauncher(),
**provider_kwargs,
)
return cls(
parsl_provider=parsl_provider,
gpu=gpu,
use_threadpool=use_threadpool,
worker_prepend=worker_prepend,
cores_per_worker=cores_per_worker,
**kwargs,
)


@typeguard.typechecked
Expand Down Expand Up @@ -251,57 +251,53 @@ def wq_resources(self):
class ReferenceEvaluation(ExecutionDefinition):
def __init__(
self,
name: Optional[str] = None,
mpi_command: Optional[str] = None,
cpu_affinity: str = "none", # better default for cp2k
name: str,
launch_command: Optional[str] = None,
max_evaluation_time: Optional[float] = None,
cp2k_executable: str = "cp2k.psmp",
**kwargs,
) -> None:
super().__init__(cpu_affinity=cpu_affinity, **kwargs)
if max_evaluation_time is not None:
assert max_evaluation_time * 60 < self.max_runtime
super().__init__(**kwargs)
self.name = name # override name
if max_evaluation_time is None:
max_evaluation_time = self.max_runtime / 60
assert max_evaluation_time * 60 <= self.max_runtime
self.max_evaluation_time = max_evaluation_time
self.cp2k_executable = cp2k_executable
if mpi_command is None: # parse

if launch_command is None:
launch_command = self.default_launch_command
self.launch_command = launch_command

@property
def default_launch_command(self):
if self.name.startswith("CP2K"):
ranks = self.cores_per_worker # use nprocs = ncores, nthreads = 1
mpi_command = "mpirun -np {} ".format(ranks)
mpi_command += "-x OMP_NUM_THREADS=1 " # cp2k runs best with these settings
mpi_command += "--bind-to core --map-by core" # set explicitly
command = "cp2k.psmp -i cp2k.inp"
return " ".join([mpi_command, command])
if self.name.startswith("GPAW"):
ranks = self.cores_per_worker # use nprocs = ncores, nthreads = 1
mpi_command = "mpirun -np {} ".format(ranks)
mpi_command += "-x OMP_NUM_THREADS=1 " # cp2k runs best with these settings
mpi_command += "--bind-to core --map-by core" # set explicitly
self.mpi_command = mpi_command
if name is not None:
self.name = name # if not None, the name of the reference class
script = "$(python -c 'import psiflow.reference.gpaw_; print(psiflow.reference.gpaw_.__file__)')"
return " ".join([mpi_command, "gpaw", "python", script])

def cp2k_command(self):
def command(self):
max_time = 0.9 * (60 * self.max_evaluation_time)
command = " ".join(
[
self.mpi_command,
self.cp2k_executable,
"-i cp2k.inp",
"timeout -s 9 {}s".format(max_time),
self.launch_command,
"|| true",
]
)
if self.max_evaluation_time is not None:
max_time = 60 * self.max_evaluation_time
command = " ".join(
[
"timeout -s 9 {}s".format(max_time),
command,
"|| true",
]
)
return command

def gpaw_command(self):
script = "$(python -c 'import psiflow.reference.gpaw_; print(psiflow.reference.gpaw_.__file__)')"
command_list = [self.mpi_command, "gpaw", "python", script]
if self.max_evaluation_time is not None:
max_time = 0.9 * (60 * self.max_evaluation_time)
command_list = [
"timeout -s 15 {}s".format(max_time),
*command_list,
"|| true",
]
return " ".join(command_list)
# def gpaw_command(self):
# if self.max_evaluation_time is not None:
# max_time = 0.9 * (60 * self.max_evaluation_time)

def wq_resources(self):
if self.use_threadpool:
Expand Down Expand Up @@ -376,8 +372,11 @@ def from_config(
internal_tasks_max_threads: int = 10,
default_threads: int = 4,
htex_address: Optional[str] = None,
container: Optional[dict] = None,
zip_staging: Optional[bool] = None,
container_uri: Optional[str] = None,
container_engine: str = "apptainer",
container_addopts: str = " --no-eval -e --no-mount home -W /tmp --writable-tmpfs",
container_entrypoint: str = "/usr/local/bin/entry.sh",
**kwargs,
) -> ExecutionContext:
if path is None:
Expand All @@ -391,44 +390,49 @@ def from_config(
name="parsl",
level=getattr(logging, parsl_log_level),
)
if container_uri is not None:
container = {
"uri": container_uri,
"engine": container_engine,
"addopts": container_addopts,
"entrypoint": container_entrypoint,
}
else:
container = None

# create definitions
model_evaluation = ModelEvaluation.from_config(
config_dict=kwargs.pop("ModelEvaluation", {}),
container=container,
**kwargs.pop("ModelEvaluation", {}),
)
model_training = ModelTraining.from_config(
config_dict=kwargs.pop("ModelTraining", {}),
container=container,
**kwargs.pop("ModelTraining", {}),
)
reference_evaluations = [] # reference evaluations might be class specific
for key in list(kwargs.keys()):
if key.endswith("ReferenceEvaluation"):
config_dict = kwargs.pop(key)
config_dict["name"] = key
if key[:4] in ["CP2K", "GPAW"]:
config = kwargs.pop(key)
reference_evaluation = ReferenceEvaluation.from_config(
config_dict=config_dict,
container=container,
name=key,
**config,
)
reference_evaluations.append(reference_evaluation)
if len(reference_evaluations) == 0:
reference_evaluation = ReferenceEvaluation.from_config(
config_dict={},
container=container,
)
reference_evaluation = ReferenceEvaluation.from_config()
reference_evaluations.append(reference_evaluation)
definitions = [model_evaluation, model_training, *reference_evaluations]

# create main parsl executors
if htex_address is None:
htex_address = address_by_hostname()
executors = [d.create_executor(path=path) for d in definitions]

# create default executors
if container is not None:
launcher = WrappedLauncher(prepend=container_launch_command(**container))
else:
launcher = SimpleLauncher()
if htex_address is None:
htex_address = address_by_hostname()
htex = HighThroughputExecutor(
label="default_htex",
address=htex_address,
Expand Down
Loading

0 comments on commit dc1b0e6

Please sign in to comment.