Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added restart functionality. #79

Merged
merged 2 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions qp_klp/Amplicon.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
class Amplicon(Step):
def __init__(self, pipeline, master_qiita_job_id,
status_update_callback=None,
lane_number=None):
lane_number=None, is_restart=False):
super().__init__(pipeline,
master_qiita_job_id,
status_update_callback,
lane_number)
lane_number,
is_restart=is_restart)

if pipeline.pipeline_type != Step.AMPLICON_TYPE:
raise ValueError("Cannot create an Amplicon run using a "
Expand Down
6 changes: 4 additions & 2 deletions qp_klp/Metagenomic.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@

class Metagenomic(Step):
def __init__(self, pipeline, master_qiita_job_id,
status_update_callback=None, lane_number=None):
status_update_callback=None, lane_number=None,
is_restart=False):
super().__init__(pipeline,
master_qiita_job_id,
status_update_callback,
lane_number)
lane_number,
is_restart)

if pipeline.pipeline_type not in Step.META_TYPES:
raise ValueError("Cannot instantiate Metagenomic object from "
Expand Down
28 changes: 22 additions & 6 deletions qp_klp/Step.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class Step:

def __init__(self, pipeline, master_qiita_job_id,
status_update_callback=None,
lane_number=None):
lane_number=None, is_restart=False):
if pipeline is None:
raise ValueError("A pipeline object is needed to initialize Step")

Expand All @@ -89,6 +89,8 @@ def __init__(self, pipeline, master_qiita_job_id,
f'{self.pipeline.run_id}_{self.lane_number}'
self.master_qiita_job_id = master_qiita_job_id

self.is_restart = is_restart

if status_update_callback is not None:
self.update_callback = status_update_callback.update_job_status
else:
Expand Down Expand Up @@ -893,28 +895,42 @@ def precheck(self, qclient):
if msgs:
raise PipelineError('\n'.join(msgs))

def execute_pipeline(self, qclient, increment_status, update=True):
def execute_pipeline(self, qclient, increment_status, update=True,
skip_steps=[]):
'''
Executes steps of pipeline in proper sequence.
:param qclient: Qiita client library or equivalent.
:param increment_status: callback function to increment status.
:param update: Set False to prevent updates to Qiita.
:return: None
'''
# this is performed even in the event of a restart.
self.generate_special_map(qclient)

# even if a job is being skipped, it's being skipped because it was
# determined that it already completed successfully. Hence,
# increment the status because we are still iterating through them.

increment_status()
self.convert_bcl_to_fastq()
if "ConvertJob" not in skip_steps:
self.convert_bcl_to_fastq()

increment_status()
self.quality_control()
if "QCJob" not in skip_steps:
self.quality_control()

increment_status()
self.generate_reports()
if "FastQCJob" not in skip_steps:
self.generate_reports()

increment_status()
self.generate_prep_file()
if "GenPrepFileJob" not in skip_steps:
self.generate_prep_file()

# for now, simply re-run any line below as if it was a new job, even
# for a restart. functionality is idempotent, except for the
# registration of new preps in Qiita. These will simply be removed
# manually.
increment_status()
self.sifs = self.generate_sifs(qclient)

Expand Down
90 changes: 70 additions & 20 deletions qp_klp/klp.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
from qp_klp.Metagenomic import Metagenomic
from qp_klp.Step import Step
from os import makedirs
from os.path import join
from os.path import join, split, exists
from sequence_processing_pipeline.Pipeline import Pipeline
from sequence_processing_pipeline.PipelineError import PipelineError
from sequence_processing_pipeline.ConvertJob import ConvertJob
from metapool import load_sample_sheet


CONFIG_FP = environ["QP_KLP_CONFIG_FP"]
Expand Down Expand Up @@ -70,11 +72,46 @@ def sequence_processing_pipeline(qclient, job_id, parameters, out_dir):
bool, list, str
The results of the job
"""
# available fields for parameters are:
# run_identifier, sample_sheet, content_type, filename, lane_number
run_identifier = parameters.pop('run_identifier')
user_input_file = parameters.pop('sample_sheet')
lane_number = parameters.pop('lane_number')
# Assume that for a job to be considered a restart, there must be work
# performed worth re-starting for. Since the working directory for each
# step is created only if the previous steps were successful, testing
# for the presence of them ensures that n-1 steps exist and were
# successful.

# at minimum, ConvertJob needs to have been successful.
is_restart = True if exists(join(out_dir, 'NuQCJob')) else False

if is_restart:
# Assume ConvertJob directory exists and parse the job-script found
# there. If this is a restart, we won't be given the run-identifier,
# the lane number, and the sample-sheet as input parameters.
some_path = join(out_dir, 'ConvertJob', 'ConvertJob.sh')
result = ConvertJob.parse_job_script(some_path)
run_identifier = split(result['out_directory'])[-1]
user_input_file = result['sample_sheet_path']
sheet = load_sample_sheet(user_input_file)
# on Amplicon runs, lane_number is always 1, and this will be
# properly reflected in the dummy sample-sheet as well.
lane_number = sheet.get_lane_number()

# check if sample-sheet is a dummy-sample-sheet. If this is an
# Amplicon run, then Assay type will be 'TruSeq HT' and Chemistry
# will be 'Amplicon'. For now, raise Error on restarting an
# Amplicon run so we don't have to search for the pre-prep file.
if sheet.Header['Assay'] == 'TruSeq HT' and \
sheet.Header['Chemistry'] == 'Amplicon':
raise ValueError("Restarting Amplicon jobs currently unsupported")

# add a note for the wetlab that this job was restarted.
with open(join(out_dir, 'notes.txt'), 'w') as f:
f.write("This job was restarted.\n"
"failed_samples.html may contain incorrect data.\n")
else:
# available fields for parameters are:
# run_identifier, sample_sheet, content_type, filename, lane_number
run_identifier = parameters.pop('run_identifier')
user_input_file = parameters.pop('sample_sheet')
lane_number = parameters.pop('lane_number')

if {'body', 'content_type', 'filename'} != set(user_input_file):
return False, None, ("This doesn't appear to be a valid sample sheet "
Expand All @@ -86,19 +123,14 @@ def sequence_processing_pipeline(qclient, job_id, parameters, out_dir):
# replace any whitespace in the filename with underscores
uif_path = out_path(user_input_file['filename'].replace(' ', '_'))

# save raw data to file
with open(uif_path, 'w') as f:
f.write(user_input_file['body'])
if is_restart:
pass
else:
# save raw data to file
with open(uif_path, 'w') as f:
f.write(user_input_file['body'])

if Pipeline.is_sample_sheet(uif_path):
# if file follows basic sample-sheet format, then it is most likely
# a sample-sheet, even if it's an invalid one.

# a valid sample-sheet is going to have one and only one occurrence of
# 'Assay,Metagenomic' or 'Assay,Metatranscriptomic'. Anything else is
# an error.

# works best from file
with open(uif_path, 'r') as f:
assay = [x for x in f.readlines() if 'Assay' in x]

Expand Down Expand Up @@ -141,6 +173,21 @@ def sequence_processing_pipeline(qclient, job_id, parameters, out_dir):
status_line = StatusUpdate(qclient, job_id, msgs)
status_line.update_current_message()

skip_steps = []
if is_restart:
# figure out what actually needs to be skipped if restarting:
if exists(join(out_dir, 'NuQCJob')):
skip_steps.append('ConvertJob')

if exists(join(out_dir, 'FastQCJob')):
skip_steps.append('NuQCJob')

if exists(join(out_dir, 'GenPrepFileJob')):
skip_steps.append('FastQCJob')

if exists(join(out_dir, 'cmds.log')):
skip_steps.append('GenPrepFileJob')

try:
pipeline = Step.generate_pipeline(pipeline_type,
uif_path,
Expand All @@ -157,10 +204,12 @@ def sequence_processing_pipeline(qclient, job_id, parameters, out_dir):
try:
if pipeline.pipeline_type in Step.META_TYPES:
step = Metagenomic(
pipeline, job_id, status_line, lane_number)
pipeline, job_id, status_line, lane_number,
is_restart=is_restart)
else:
step = Amplicon(
pipeline, job_id, status_line, lane_number)
pipeline, job_id, status_line, lane_number,
is_restart=is_restart)

status_line.update_current_message()

Expand All @@ -170,7 +219,8 @@ def sequence_processing_pipeline(qclient, job_id, parameters, out_dir):
# files into uploads directory. Useful for testing.
step.execute_pipeline(qclient,
status_line.update_current_message,
update=True)
update=True,
skip_steps=skip_steps)

status_line.update_current_message()

Expand Down
Loading