Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix --printJobInfo #4709

Merged
merged 13 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/toil/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -592,9 +592,9 @@ def create_config_dict_from_parser(parser: ArgumentParser) -> CommentedMap:


def parser_with_common_options(
provisioner_options: bool = False, jobstore_option: bool = True
provisioner_options: bool = False, jobstore_option: bool = True, prog: Optional[str] = None
) -> ArgParser:
parser = ArgParser(prog="Toil", formatter_class=ArgumentDefaultsHelpFormatter)
parser = ArgParser(prog=prog or "Toil", formatter_class=ArgumentDefaultsHelpFormatter)

if provisioner_options:
add_provisioner_options(parser)
Expand Down
2 changes: 1 addition & 1 deletion src/toil/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ def makeString(x: Union[str, bytes, None]) -> str:
# them behind if we die right after saving the JobDescription.
#
# This will be empty at all times except when a new version of a job is
# in the process of being committed.
# in the process of being committed.
self.filesToDelete = []

# Holds JobStore Job IDs of the jobs that have been chained into this
Expand Down
5 changes: 3 additions & 2 deletions src/toil/jobStores/abstractJobStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
Callable,
ContextManager,
Dict,
Iterable,
Iterator,
List,
Optional,
Expand Down Expand Up @@ -607,7 +608,7 @@ def read_from_url(cls, src_uri: str, writable: IO[bytes]) -> Tuple[int, bool]:
parseResult = urlparse(src_uri)
otherCls = cls._findJobStoreForUrl(parseResult)
return otherCls._read_from_url(parseResult, writable)

@classmethod
def open_url(cls, src_uri: str) -> IO[bytes]:
"""
Expand All @@ -621,7 +622,7 @@ def open_url(cls, src_uri: str) -> IO[bytes]:
parseResult = urlparse(src_uri)
otherCls = cls._findJobStoreForUrl(parseResult)
return otherCls._open_url(parseResult)

@classmethod
@abstractmethod
def _url_exists(cls, url: ParseResult) -> bool:
Expand Down
87 changes: 85 additions & 2 deletions src/toil/jobStores/fileJobStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import time
import uuid
from contextlib import contextmanager
from typing import IO, Iterator, List, Optional, Union, overload
from typing import IO, Iterable, Iterator, List, Optional, Union, overload
from urllib.parse import ParseResult, quote, unquote

if sys.version_info >= (3, 8):
Expand Down Expand Up @@ -499,7 +499,7 @@ def update_file(self, file_id, local_path):

atomic_copy(local_path, jobStoreFilePath)

def read_file(self, file_id, local_path, symlink=False):
def read_file(self, file_id: str, local_path: str, symlink: bool = False) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding the Typing!

self._check_job_store_file_id(file_id)
jobStoreFilePath = self._get_file_path_from_id(file_id)
localDirPath = os.path.dirname(local_path)
Expand Down Expand Up @@ -716,6 +716,69 @@ def read_shared_file_stream(self, shared_file_name, encoding=None, errors=None):
else:
raise

def list_all_file_names(self, for_job: Optional[str] = None) -> Iterable[str]:
"""
Get all the file names (not file IDs) of files stored in the job store.

Used for debugging.

:param for_job: If set, restrict the list to files for a particular job.
"""

# TODO: Promote to AbstractJobStore.
# TODO: Include stats-and-logging files?

if for_job is not None:
# Run on one job
jobs = [for_job]
else:
# Run on all the jobs
jobs = []
# But not all the jobs that exist, we want all the jobs that have
# files. So look at the file directories which mirror the job
# directories' structure.
for job_kind_dir in self._list_dynamic_spray_dir(self.jobFilesDir):
# First we sprayed all the job kinds over a tree
for job_instance_dir in self._list_dynamic_spray_dir(job_kind_dir):
# Then we sprayed the job instances over a tree
# And based on those we get the job name
job_id = self._get_job_id_from_files_dir(job_instance_dir)
jobs.append(job_id)

for name in os.listdir(self.sharedFilesDir):
# Announce all the shared files
yield name

for file_dir_path in self._list_dynamic_spray_dir(self.filesDir):
# Run on all the no-job files
for dir_file in os.listdir(file_dir_path):
# There ought to be just one file in here.
yield dir_file

for job_store_id in jobs:
# Files from _get_job_files_dir
job_files_dir = os.path.join(self.jobFilesDir, job_store_id)
if os.path.exists(job_files_dir):
for file_dir in os.listdir(job_files_dir):
# Each file is in its own directory
if file_dir == "cleanup":
# Except the cleanup directory which we do later.
continue
file_dir_path = os.path.join(job_files_dir, file_dir)
for dir_file in os.listdir(file_dir_path):
# There ought to be just one file in here.
yield dir_file

# Files from _get_job_files_cleanup_dir
job_cleanup_files_dir = os.path.join(job_files_dir, "cleanup")
if os.path.exists(job_cleanup_files_dir):
for file_dir in os.listdir(job_cleanup_files_dir):
# Each file is in its own directory
file_dir_path = os.path.join(job_cleanup_files_dir, file_dir)
for dir_file in os.listdir(file_dir_path):
# There ought to be just one file in here.
yield dir_file

def write_logs(self, msg):
# Temporary files are placed in the stats directory tree
tempStatsFileName = "stats" + str(uuid.uuid4().hex) + ".new"
Expand Down Expand Up @@ -763,6 +826,13 @@ def _get_job_id_from_dir(self, absPath):
"""
return absPath[len(self.jobsDir)+1:]

def _get_job_id_from_files_dir(self, absPath: str) -> str:
"""
:param str absPath: The absolute path to a job directory under self.jobFilesDir which holds a job's files.
:rtype : string, string is the job ID
"""
return absPath[len(self.jobFilesDir)+1:]

def _get_job_file_name(self, jobStoreID):
"""
Return the path to the file containing the serialised JobDescription instance for the given
Expand Down Expand Up @@ -981,6 +1051,19 @@ def _walk_dynamic_spray_dir(self, root):
# Recurse
yield from self._walk_dynamic_spray_dir(childPath)

def _list_dynamic_spray_dir(self, root):
"""
For a directory tree filled in by _getDynamicSprayDir, yields each
highest-level file or or directory *not* created by _getDynamicSprayDir
(i.e. the actual contents).
"""

for spray_dir in self._walk_dynamic_spray_dir(root):
for child in os.listdir(spray_dir):
if child not in self.validDirsSet:
# This is a real content item we are storing
yield os.path.join(spray_dir, child)

def _job_directories(self):
"""
:rtype : an iterator to the temporary directories containing job
Expand Down
81 changes: 81 additions & 0 deletions src/toil/test/utils/toilDebugTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import pytest

from toil.test import ToilTest

from toil.lib.resources import glob
from toil.test import slow
from toil.version import python
Expand Down Expand Up @@ -118,3 +120,82 @@ def testFetchJobStoreFiles() -> None:
os.makedirs(output_dir, exist_ok=True)
for symlink in (True, False):
fetchFiles(symLink=symlink, jobStoreDir=job_store_dir, outputDir=output_dir)

class DebugJobTest(ToilTest):
"""
Test the toil debug-job command.
"""

def _get_job_store_and_job_id(self):
"""
Get a job store and the ID of a failing job within it.
"""

# First make a job store.
job_store = os.path.join(self._createTempDir(), "tree")

logger.info("Running workflow that always fails")
try:
# Run an always-failign workflow
subprocess.check_call([
python,
os.path.abspath("src/toil/test/docs/scripts/example_alwaysfail.py"),
"--retryCount=0",
"--logCritical",
"--disableProgress=True",
job_store
], stderr=subprocess.DEVNULL)
raise RuntimeError("Failing workflow succeeded!")
except subprocess.CalledProcessError:
# Should fail to run
logger.info("Task failed successfully")
pass

# Get the job ID.
# TODO: This assumes a lot about the FileJobStore. Use the MessageBus instead?
job_id = "kind-explode/" + os.listdir(os.path.join(job_store, "jobs/kind-explode"))[0]

return job_store, job_id

def test_run_job(self):
"""
Make sure that we can use toil debug-job to try and run a job in-process.
"""

job_store, job_id = self._get_job_store_and_job_id()

logger.info("Trying to rerun job %s", job_id)

# Rerun the job, which should fail again
output = subprocess.check_output([
"toil",
"debug-job",
"--logDebug",
job_store,
job_id
], stderr=subprocess.STDOUT)
# Even if the job fails, the attempt to run it will succeed.
log = output.decode('utf-8')
assert "Boom!" in log, f"Did not find the expected exception message in: {log}"


def test_print_job_info(self):
"""
Make sure that we can use --printJobInfo to get information on a job from a job store.
"""

job_store, job_id = self._get_job_store_and_job_id()

logger.info("Trying to print job info for job %s", job_id)

# Print the job info and make sure that doesn't crash.
subprocess.check_call([
"toil",
"debug-job",
"--logDebug",
job_store,
"--printJobInfo",
job_id
])


2 changes: 1 addition & 1 deletion src/toil/utils/toilClean.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@


def main() -> None:
parser = parser_with_common_options(jobstore_option=True)
parser = parser_with_common_options(jobstore_option=True, prog="toil clean")

options = parser.parse_args()
set_logging_from_options(options)
Expand Down
Loading