Skip to content
This repository has been archived by the owner on Jul 18, 2024. It is now read-only.

Commit

Permalink
update docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
PythonFZ committed Aug 30, 2022
1 parent 89897b3 commit 86396b2
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 14 deletions.
12 changes: 5 additions & 7 deletions dask4dvc/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,6 @@ def run(
Run 'dvc exp run --run-all' with full parallel execution using Dask distributed.
"""
# TODO rename to exp run
cleanup = cleanup

if name is None:
tmp_dirs = dask4dvc.dvc_handling.load_all_exp_to_tmp_dir()

Expand All @@ -111,7 +108,7 @@ def run(
for tmp_dir in tmp_dirs.values():
shutil.rmtree(tmp_dir)
answer = input(
"Press Enter to load experiments and close dask client (yes/no)"
"Press Enter to load experiments and close dask client (yes/no) "
)

if answer == "yes":
Expand All @@ -133,7 +130,7 @@ def run(


def version_callback(value: bool):
"""Get the installed zntrack version"""
"""Get the installed dask4dvc version"""
if value:
typer.echo(f"dask4dvc {dask4dvc.__version__}")
raise typer.Exit()
Expand All @@ -145,7 +142,8 @@ def main(
None, "--version", callback=version_callback, is_eager=True
),
):
"""
<update me>
"""Dask4DVC
Run the DVC graph or DVC experiments in parallel using dask.
"""
_ = version # this would be greyed out otherwise
18 changes: 13 additions & 5 deletions dask4dvc/dvc_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@


def get_dvc_graph(cwd=None) -> nx.DiGraph:
"""Use the dvc dag command to get the graph into networkx format"""
"""Use the dvc dag command to get the graph into networkx format
Running 'dvc dag' can be slow on bigger graphs.
"""
dot_string = subprocess.run(
["dvc", "dag", "--dot"], capture_output=True, check=True, cwd=cwd
)
Expand All @@ -31,6 +34,9 @@ def run_dvc_repro_in_cwd(node_name: str, cwd=None, deps=None) -> None:
cwd: str
working directory
deps: list[dask.distributed.Future]|Future
Any dependencies for the dask graph
Raises
-------
subprocess.CalledProcessError: if dvc cmd fails
Expand Down Expand Up @@ -96,7 +102,9 @@ def prepare_dvc_workspace(
return tmp_dir


def submit_dvc_stage(name, deps=None, cwd: pathlib.Path = None, cleanup: bool = True):
def submit_dvc_stage(
name: str, deps=None, cwd: pathlib.Path = None, cleanup: bool = True
):
"""Run a DVC stage
1. prepare a temporary workspace
Expand All @@ -107,7 +115,8 @@ def submit_dvc_stage(name, deps=None, cwd: pathlib.Path = None, cleanup: bool =
----------
name: str
Name of the Node
deps: any dependencies for Dask to build the graph
deps: list[dask.distributed.Future]|Future
any dependencies for Dask to build the graph
cwd: pathlib.Path
The working directory for the repro command.
Will be None for 'dvc repro' and set to a custom directory for e.g. 'dvc exp run'
Expand All @@ -130,12 +139,11 @@ def submit_dvc_stage(name, deps=None, cwd: pathlib.Path = None, cleanup: bool =


def load_all_exp_to_tmp_dir() -> typing.Dict[str, pathlib.Path]:
"""Load all queued expments into temporary directories each.
"""Load all queued experiments into temporary directories each.
Returns
-------
A dictionary of the created directories with the experiment names as keys.
"""
queued_exp = get_queued_exp_names()
tmp_dirs = {}
Expand Down
11 changes: 9 additions & 2 deletions dask4dvc/utils/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,11 @@ def iterate_over_nodes(


def submit_to_dask(
client: dask.distributed.Client, node_pairs, cmd: typing.Callable, **kwargs
):
client: dask.distributed.Client,
node_pairs: typing.List[tuple],
cmd: typing.Callable,
**kwargs,
) -> typing.Dict[str, dask.distributed.Future]:
"""Use the node_pairs to map the DVC graph onto a dask graph.
Parameters
Expand All @@ -78,6 +81,10 @@ def submit_to_dask(
Returns
-------
dict: A dictionary with a unique submission id as key and the respective dask node
as value. The unique key is added for parallel execution of experiments with
the same Node name.
"""

task_name_map = {}
Expand Down

0 comments on commit 86396b2

Please sign in to comment.