Skip to content

Commit c1fc638

Browse files
charles-cowartcharlie
andauthored
Added restart functionality. (#79)
* Added restart functionality. * Added a note to the wetlab when a job is restarted. --------- Co-authored-by: charlie <[email protected]>
1 parent bf0d6c9 commit c1fc638

File tree

4 files changed

+99
-30
lines changed

4 files changed

+99
-30
lines changed

qp_klp/Amplicon.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@
77
class Amplicon(Step):
88
def __init__(self, pipeline, master_qiita_job_id,
99
status_update_callback=None,
10-
lane_number=None):
10+
lane_number=None, is_restart=False):
1111
super().__init__(pipeline,
1212
master_qiita_job_id,
1313
status_update_callback,
14-
lane_number)
14+
lane_number,
15+
is_restart=is_restart)
1516

1617
if pipeline.pipeline_type != Step.AMPLICON_TYPE:
1718
raise ValueError("Cannot create an Amplicon run using a "

qp_klp/Metagenomic.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33

44
class Metagenomic(Step):
55
def __init__(self, pipeline, master_qiita_job_id,
6-
status_update_callback=None, lane_number=None):
6+
status_update_callback=None, lane_number=None,
7+
is_restart=False):
78
super().__init__(pipeline,
89
master_qiita_job_id,
910
status_update_callback,
10-
lane_number)
11+
lane_number,
12+
is_restart)
1113

1214
if pipeline.pipeline_type not in Step.META_TYPES:
1315
raise ValueError("Cannot instantiate Metagenomic object from "

qp_klp/Step.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ class Step:
7676

7777
def __init__(self, pipeline, master_qiita_job_id,
7878
status_update_callback=None,
79-
lane_number=None):
79+
lane_number=None, is_restart=False):
8080
if pipeline is None:
8181
raise ValueError("A pipeline object is needed to initialize Step")
8282

@@ -89,6 +89,8 @@ def __init__(self, pipeline, master_qiita_job_id,
8989
f'{self.pipeline.run_id}_{self.lane_number}'
9090
self.master_qiita_job_id = master_qiita_job_id
9191

92+
self.is_restart = is_restart
93+
9294
if status_update_callback is not None:
9395
self.update_callback = status_update_callback.update_job_status
9496
else:
@@ -893,28 +895,42 @@ def precheck(self, qclient):
893895
if msgs:
894896
raise PipelineError('\n'.join(msgs))
895897

896-
def execute_pipeline(self, qclient, increment_status, update=True):
898+
def execute_pipeline(self, qclient, increment_status, update=True,
899+
skip_steps=[]):
897900
'''
898901
Executes steps of pipeline in proper sequence.
899902
:param qclient: Qiita client library or equivalent.
900903
:param increment_status: callback function to increment status.
901904
:param update: Set False to prevent updates to Qiita.
902905
:return: None
903906
'''
907+
# this is performed even in the event of a restart.
904908
self.generate_special_map(qclient)
905909

910+
# even if a job is being skipped, it's being skipped because it was
911+
# determined that it already completed successfully. Hence,
912+
# increment the status because we are still iterating through them.
913+
906914
increment_status()
907-
self.convert_bcl_to_fastq()
915+
if "ConvertJob" not in skip_steps:
916+
self.convert_bcl_to_fastq()
908917

909918
increment_status()
910-
self.quality_control()
919+
if "QCJob" not in skip_steps:
920+
self.quality_control()
911921

912922
increment_status()
913-
self.generate_reports()
923+
if "FastQCJob" not in skip_steps:
924+
self.generate_reports()
914925

915926
increment_status()
916-
self.generate_prep_file()
927+
if "GenPrepFileJob" not in skip_steps:
928+
self.generate_prep_file()
917929

930+
# for now, simply re-run any line below as if it was a new job, even
931+
# for a restart. functionality is idempotent, except for the
932+
# registration of new preps in Qiita. These will simply be removed
933+
# manually.
918934
increment_status()
919935
self.sifs = self.generate_sifs(qclient)
920936

qp_klp/klp.py

Lines changed: 70 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,11 @@
1212
from qp_klp.Metagenomic import Metagenomic
1313
from qp_klp.Step import Step
1414
from os import makedirs
15-
from os.path import join
15+
from os.path import join, split, exists
1616
from sequence_processing_pipeline.Pipeline import Pipeline
1717
from sequence_processing_pipeline.PipelineError import PipelineError
18+
from sequence_processing_pipeline.ConvertJob import ConvertJob
19+
from metapool import load_sample_sheet
1820

1921

2022
CONFIG_FP = environ["QP_KLP_CONFIG_FP"]
@@ -70,11 +72,46 @@ def sequence_processing_pipeline(qclient, job_id, parameters, out_dir):
7072
bool, list, str
7173
The results of the job
7274
"""
73-
# available fields for parameters are:
74-
# run_identifier, sample_sheet, content_type, filename, lane_number
75-
run_identifier = parameters.pop('run_identifier')
76-
user_input_file = parameters.pop('sample_sheet')
77-
lane_number = parameters.pop('lane_number')
75+
# Assume that for a job to be considered a restart, there must be work
76+
# performed worth re-starting for. Since the working directory for each
77+
# step is created only if the previous steps were successful, testing
78+
# for the presence of them ensures that n-1 steps exist and were
79+
# successful.
80+
81+
# at minimum, ConvertJob needs to have been successful.
82+
is_restart = True if exists(join(out_dir, 'NuQCJob')) else False
83+
84+
if is_restart:
85+
# Assume ConvertJob directory exists and parse the job-script found
86+
# there. If this is a restart, we won't be given the run-identifier,
87+
# the lane number, and the sample-sheet as input parameters.
88+
some_path = join(out_dir, 'ConvertJob', 'ConvertJob.sh')
89+
result = ConvertJob.parse_job_script(some_path)
90+
run_identifier = split(result['out_directory'])[-1]
91+
user_input_file = result['sample_sheet_path']
92+
sheet = load_sample_sheet(user_input_file)
93+
# on Amplicon runs, lane_number is always 1, and this will be
94+
# properly reflected in the dummy sample-sheet as well.
95+
lane_number = sheet.get_lane_number()
96+
97+
# check if sample-sheet is a dummy-sample-sheet. If this is an
98+
# Amplicon run, then Assay type will be 'TruSeq HT' and Chemistry
99+
# will be 'Amplicon'. For now, raise Error on restarting an
100+
# Amplicon run so we don't have to search for the pre-prep file.
101+
if sheet.Header['Assay'] == 'TruSeq HT' and \
102+
sheet.Header['Chemistry'] == 'Amplicon':
103+
raise ValueError("Restarting Amplicon jobs currently unsupported")
104+
105+
# add a note for the wetlab that this job was restarted.
106+
with open(join(out_dir, 'notes.txt'), 'w') as f:
107+
f.write("This job was restarted.\n"
108+
"failed_samples.html may contain incorrect data.\n")
109+
else:
110+
# available fields for parameters are:
111+
# run_identifier, sample_sheet, content_type, filename, lane_number
112+
run_identifier = parameters.pop('run_identifier')
113+
user_input_file = parameters.pop('sample_sheet')
114+
lane_number = parameters.pop('lane_number')
78115

79116
if {'body', 'content_type', 'filename'} != set(user_input_file):
80117
return False, None, ("This doesn't appear to be a valid sample sheet "
@@ -86,19 +123,14 @@ def sequence_processing_pipeline(qclient, job_id, parameters, out_dir):
86123
# replace any whitespace in the filename with underscores
87124
uif_path = out_path(user_input_file['filename'].replace(' ', '_'))
88125

89-
# save raw data to file
90-
with open(uif_path, 'w') as f:
91-
f.write(user_input_file['body'])
126+
if is_restart:
127+
pass
128+
else:
129+
# save raw data to file
130+
with open(uif_path, 'w') as f:
131+
f.write(user_input_file['body'])
92132

93133
if Pipeline.is_sample_sheet(uif_path):
94-
# if file follows basic sample-sheet format, then it is most likely
95-
# a sample-sheet, even if it's an invalid one.
96-
97-
# a valid sample-sheet is going to have one and only one occurrence of
98-
# 'Assay,Metagenomic' or 'Assay,Metatranscriptomic'. Anything else is
99-
# an error.
100-
101-
# works best from file
102134
with open(uif_path, 'r') as f:
103135
assay = [x for x in f.readlines() if 'Assay' in x]
104136

@@ -141,6 +173,21 @@ def sequence_processing_pipeline(qclient, job_id, parameters, out_dir):
141173
status_line = StatusUpdate(qclient, job_id, msgs)
142174
status_line.update_current_message()
143175

176+
skip_steps = []
177+
if is_restart:
178+
# figure out what actually needs to be skipped if restarting:
179+
if exists(join(out_dir, 'NuQCJob')):
180+
skip_steps.append('ConvertJob')
181+
182+
if exists(join(out_dir, 'FastQCJob')):
183+
skip_steps.append('NuQCJob')
184+
185+
if exists(join(out_dir, 'GenPrepFileJob')):
186+
skip_steps.append('FastQCJob')
187+
188+
if exists(join(out_dir, 'cmds.log')):
189+
skip_steps.append('GenPrepFileJob')
190+
144191
try:
145192
pipeline = Step.generate_pipeline(pipeline_type,
146193
uif_path,
@@ -157,10 +204,12 @@ def sequence_processing_pipeline(qclient, job_id, parameters, out_dir):
157204
try:
158205
if pipeline.pipeline_type in Step.META_TYPES:
159206
step = Metagenomic(
160-
pipeline, job_id, status_line, lane_number)
207+
pipeline, job_id, status_line, lane_number,
208+
is_restart=is_restart)
161209
else:
162210
step = Amplicon(
163-
pipeline, job_id, status_line, lane_number)
211+
pipeline, job_id, status_line, lane_number,
212+
is_restart=is_restart)
164213

165214
status_line.update_current_message()
166215

@@ -170,7 +219,8 @@ def sequence_processing_pipeline(qclient, job_id, parameters, out_dir):
170219
# files into uploads directory. Useful for testing.
171220
step.execute_pipeline(qclient,
172221
status_line.update_current_message,
173-
update=True)
222+
update=True,
223+
skip_steps=skip_steps)
174224

175225
status_line.update_current_message()
176226

0 commit comments

Comments
 (0)