Skip to content

Commit c0230b9

Browse files
authored
test full runs in WorkflowFactory.py (#131)
* inserting data to qiita for testing * testing creation of study in qiit * add sbatch * shopt * echo .profile * slurm * partition * scontrol create partition * mv scontrol * partitionname * sudo scontrol * add some prints * /usr/bin/sbatch * sudo * env * .local/bin/sbatch * ls * sbatch in conda * squeue * improve error display and running tests * sbatch * GITHUB_PATH * adding files to tests/bin * test_metagenomic_workflow_creation * adding _inject_data * fixing some tests * fixing other tests * copyfile -> touch() * more copyfile -> touch() * fix test_tellseq_workflow_creation * addressing @AmandaBirmingham comments * flake8 * addressing @AmandaBirmingham note about comments
1 parent f72766c commit c0230b9

File tree

13 files changed

+668
-581
lines changed

13 files changed

+668
-581
lines changed

src/qp_klp/Assays.py

Lines changed: 122 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import pandas as pd
99
from json import dumps
1010
from collections import defaultdict
11-
import re
1211

1312

1413
ASSAY_NAME_NONE = "Assay"
@@ -144,13 +143,133 @@ def _generate_artifact_name(self, prep_file_path):
144143
# this is a normal pre-prep or sample-sheet.
145144
return (a_name, False)
146145

146+
def execute_pipeline(self):
147+
'''
148+
Executes steps of pipeline in proper sequence.
149+
:return: None
150+
'''
151+
if not self.is_restart:
152+
self.pre_check()
153+
154+
# this is performed even in the event of a restart.
155+
self.generate_special_map()
156+
157+
# even if a job is being skipped, it's being skipped because it was
158+
# determined that it already completed successfully. Hence,
159+
# increment the status because we are still iterating through them.
160+
161+
self.update_status("Converting data", 1, 9)
162+
if "ConvertJob" not in self.skip_steps:
163+
# converting raw data to fastq depends heavily on the instrument
164+
# used to generate the run_directory. Hence this method is
165+
# supplied by the instrument mixin.
166+
self.convert_raw_to_fastq()
167+
self.integrate_results()
168+
self.generate_sequence_counts()
169+
170+
self.update_status("QC-ing reads", 2, 9)
171+
if "NuQCJob" not in self.skip_steps:
172+
self.qc_reads()
173+
174+
self.update_status("Generating reports", 3, 9)
175+
if "FastQCJob" not in self.skip_steps:
176+
# reports are currently implemented by the assay mixin. This is
177+
# only because metagenomic runs currently require a failed-samples
178+
# report to be generated. This is not done for amplicon runs since
179+
# demultiplexing occurs downstream of SPP.
180+
self.generate_reports()
181+
182+
self.update_status("Generating preps", 4, 9)
183+
if "GenPrepFileJob" not in self.skip_steps:
184+
self.generate_prep_file()
185+
186+
# moved final component of genprepfilejob outside of object.
187+
# obtain the paths to the prep-files generated by GenPrepFileJob
188+
# w/out having to recover full state.
189+
tmp = join(self.pipeline.output_path, 'GenPrepFileJob', 'PrepFiles')
190+
191+
self.has_replicates = False
192+
193+
prep_paths = []
194+
self.prep_file_paths = {}
195+
196+
for root, dirs, files in walk(tmp):
197+
for _file in files:
198+
# we are looing for .tsv files and we are only interested
199+
# in the string after the last _, which is the study_id
200+
if not _file.endswith('.tsv'):
201+
continue
202+
# continue if no underscore
203+
chunks = _file.rsplit('_', 1)
204+
if len(chunks) <= 1:
205+
continue
206+
# continue if no int after .
207+
qid = chunks[-1].split('.')[0]
208+
if not qid.isnumeric():
209+
continue
210+
if qid not in self.prep_file_paths:
211+
self.prep_file_paths[qid] = []
212+
213+
_path = abspath(join(root, _file))
214+
prep_paths.append(_path)
215+
self.prep_file_paths[qid].append(_path)
216+
217+
for _dir in dirs:
218+
if _dir == '1':
219+
# if PrepFiles contains the '1' directory, then it's a
220+
# given that this sample-sheet contains replicates.
221+
self.has_replicates = True
222+
223+
# currently imported from Assay although it is a base method. it
224+
# could be imported into Workflows potentially, since it is a post-
225+
# processing step. All pairings of assay and instrument type need to
226+
# generate prep-info files in the same format.
227+
self.overwrite_prep_files(prep_paths)
228+
229+
# for now, simply re-run any line below as if it was a new job, even
230+
# for a restart. functionality is idempotent, except for the
231+
# registration of new preps in Qiita. These will simply be removed
232+
# manually.
233+
234+
# post-processing steps are by default associated with the Workflow
235+
# class, since they deal with fastq files and Qiita, and don't depend
236+
# on assay or instrument type.
237+
self.update_status("Generating sample information", 5, 9)
238+
self.sifs = self.generate_sifs()
239+
240+
# post-processing step.
241+
self.update_status("Registering blanks in Qiita", 6, 9)
242+
if self.update:
243+
self.update_blanks_in_qiita()
244+
245+
self.update_status("Loading preps into Qiita", 7, 9)
246+
if self.update:
247+
self.update_prep_templates()
248+
249+
# before we load preps into Qiita we need to copy the fastq
250+
# files n times for n preps and correct the file-paths each
251+
# prep is pointing to.
252+
self.load_preps_into_qiita()
253+
254+
# before we pack the results, we need to generate the human-readable
255+
# report of samples lost in each step. The tracking is being done
256+
# within fsr (FailedSamplesRecord), in conjuction with Job.audit.
257+
self.fsr.generate_report()
258+
259+
self.update_status("Generating packaging commands", 8, 9)
260+
self.generate_commands()
261+
262+
self.update_status("Packaging results", 9, 9)
263+
if self.update:
264+
self.execute_commands()
265+
147266

148267
class Amplicon(Assay):
149268
AMPLICON_TYPE = 'Amplicon'
150269
AMPLICON_SUB_TYPES = {'16S', '18S', 'ITS'}
151270
assay_type = ASSAY_NAME_AMPLICON
152271

153-
def post_process_raw_fastq_output(self):
272+
def qc_reads(self):
154273
"""
155274
Post-process ConvertJob output into correct form for FastQCJob.
156275
"""
@@ -353,7 +472,7 @@ class MetaOmic(Assay):
353472
# MetaOmic does not have an assay_type of its own. It is defined by its
354473
# children.
355474

356-
def quality_control(self):
475+
def qc_reads(self):
357476
# because this is a mixin, assume containing object will contain
358477
# a pipeline object.
359478
config = self.pipeline.get_software_configuration('nu-qc')
@@ -521,111 +640,6 @@ def load_preps_into_qiita(self):
521640

522641
return df
523642

524-
def execute_pipeline(self):
525-
'''
526-
Executes steps of pipeline in proper sequence.
527-
:return: None
528-
'''
529-
self.pre_check()
530-
531-
self.generate_special_map()
532-
533-
self.update_status("Converting data", 1, 9)
534-
535-
self.convert_raw_to_fastq()
536-
537-
self.integrate_results()
538-
539-
self.generate_sequence_counts()
540-
541-
self.update_status("Performing quality control", 2, 9)
542-
self.quality_control()
543-
544-
self.update_status("Generating reports", 3, 9)
545-
self.generate_reports()
546-
547-
self.update_status("Generating preps", 4, 9)
548-
self.generate_prep_file()
549-
550-
# moved final component of genprepfilejob outside of object.
551-
# obtain the paths to the prep-files generated by GenPrepFileJob
552-
# w/out having to recover full state.
553-
tmp = join(self.pipeline.output_path, 'GenPrepFileJob', 'PrepFiles')
554-
555-
self.has_replicates = False
556-
557-
prep_paths = []
558-
self.prep_file_paths = {}
559-
560-
rematch = re.compile(
561-
r"(?P<runid>[a-zA-Z0-9_-]+)\.(?P<qname>[a-zA-Z0-9_]+)"
562-
r"(?P<qid>[0-9]{5,6})\..\.tsv")
563-
564-
for root, dirs, files in walk(tmp):
565-
for _file in files:
566-
# breakup the prep-info-file into segments
567-
# (run-id, project_qid, other) and cleave
568-
# the qiita-id from the project_name.
569-
rer = rematch.match(_file)
570-
if rer is None:
571-
continue
572-
573-
_, _, qid = rer.groups()
574-
575-
if qid not in self.prep_file_paths:
576-
self.prep_file_paths[qid] = []
577-
578-
_path = abspath(join(root, _file))
579-
if _path.endswith('.tsv'):
580-
prep_paths.append(_path)
581-
self.prep_file_paths[qid].append(_path)
582-
583-
for _dir in dirs:
584-
if _dir == '1':
585-
# if PrepFiles contains the '1' directory, then it's a
586-
# given that this sample-sheet contains replicates.
587-
self.has_replicates = True
588-
589-
# currently imported from Assay although it is a base method. it
590-
# could be imported into Workflows potentially, since it is a post-
591-
# processing step. All pairings of assay and instrument type need to
592-
# generate prep-info files in the same format.
593-
self.overwrite_prep_files(prep_paths)
594-
595-
# for now, simply re-run any line below as if it was a new job, even
596-
# for a restart. functionality is idempotent, except for the
597-
# registration of new preps in Qiita. These will simply be removed
598-
# manually.
599-
600-
# post-processing steps are by default associated with the Workflow
601-
# class, since they deal with fastq files and Qiita, and don't depend
602-
# on assay or instrument type.
603-
self.update_status("Generating sample information", 5, 9)
604-
self.sifs = self.generate_sifs()
605-
606-
# post-processing step.
607-
self.update_status("Registering blanks in Qiita", 6, 9)
608-
if self.update:
609-
self.update_blanks_in_qiita()
610-
611-
self.update_status("Loading preps into Qiita", 7, 9)
612-
if self.update:
613-
self.update_prep_templates()
614-
615-
# before we load preps into Qiita we need to copy the fastq
616-
# files n times for n preps and correct the file-paths each
617-
# prep is pointing to.
618-
self.load_preps_into_qiita()
619-
620-
self.fsr.generate_report()
621-
622-
self.update_status("Generating packaging commands", 8, 9)
623-
self.generate_commands()
624-
625-
self.update_status("Packaging results", 9, 9)
626-
if self.update:
627-
self.execute_commands()
628-
629643

630644
class Metagenomic(MetaOmic):
631645
METAGENOMIC_TYPE = 'Metagenomic'
Lines changed: 0 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
from .Protocol import Illumina
2-
from os.path import join, abspath
3-
from os import walk
42
from sequence_processing_pipeline.Pipeline import Pipeline
53
from .Assays import Amplicon
64
from .Assays import ASSAY_NAME_AMPLICON
75
from .Workflows import Workflow
8-
import re
96

107

118
class StandardAmpliconWorkflow(Workflow, Amplicon, Illumina):
@@ -52,122 +49,3 @@ def __init__(self, **kwargs):
5249
"type bool")
5350

5451
self.update = kwargs['update_qiita']
55-
56-
def execute_pipeline(self):
57-
'''
58-
Executes steps of pipeline in proper sequence.
59-
:return: None
60-
'''
61-
if not self.is_restart:
62-
self.pre_check()
63-
64-
# this is performed even in the event of a restart.
65-
self.generate_special_map()
66-
67-
# even if a job is being skipped, it's being skipped because it was
68-
# determined that it already completed successfully. Hence,
69-
# increment the status because we are still iterating through them.
70-
71-
self.update_status("Converting data", 1, 9)
72-
if "ConvertJob" not in self.skip_steps:
73-
# converting raw data to fastq depends heavily on the instrument
74-
# used to generate the run_directory. Hence this method is
75-
# supplied by the instrument mixin.
76-
self.convert_raw_to_fastq()
77-
78-
self.update_status("Post-processing raw fasq output", 2, 9)
79-
if "NuQCJob" not in self.skip_steps:
80-
# there is no failed samples reporting for amplicon runs.
81-
self.post_process_raw_fastq_output()
82-
83-
self.update_status("Generating reports", 3, 9)
84-
if "FastQCJob" not in self.skip_steps:
85-
# reports are currently implemented by the assay mixin. This is
86-
# only because metagenomic runs currently require a failed-samples
87-
# report to be generated. This is not done for amplicon runs since
88-
# demultiplexing occurs downstream of SPP.
89-
self.generate_reports()
90-
91-
self.update_status("Generating preps", 4, 9)
92-
if "GenPrepFileJob" not in self.skip_steps:
93-
# preps are currently associated with array mixin, but only
94-
# because there are currently some slight differences in how
95-
# FastQCJob gets instantiated(). This could get moved into a
96-
# shared method, but probably still in Assay.
97-
self.generate_prep_file()
98-
99-
# moved final component of genprepfilejob outside of object.
100-
# obtain the paths to the prep-files generated by GenPrepFileJob
101-
# w/out having to recover full state.
102-
tmp = join(self.pipeline.output_path, 'GenPrepFileJob', 'PrepFiles')
103-
104-
self.has_replicates = False
105-
106-
prep_paths = []
107-
self.prep_file_paths = {}
108-
rematch = re.compile(
109-
r"(?P<runid>[a-zA-Z0-9_-]+)\.(?P<qname>[a-zA-Z0-9_]+)"
110-
r"(?P<qid>[0-9]{5,6})\..\.tsv")
111-
112-
for root, dirs, files in walk(tmp):
113-
for _file in files:
114-
# breakup the prep-info-file into segments
115-
# (run-id, project_qid, other) and cleave
116-
# the qiita-id from the project_name.
117-
rer = rematch.match(_file)
118-
if rer is None:
119-
continue
120-
121-
_, _, qid = rer.groups()
122-
123-
if qid not in self.prep_file_paths:
124-
self.prep_file_paths[qid] = []
125-
126-
_path = abspath(join(root, _file))
127-
if _path.endswith('.tsv'):
128-
prep_paths.append(_path)
129-
self.prep_file_paths[qid].append(_path)
130-
131-
for _dir in dirs:
132-
if _dir == '1':
133-
# if PrepFiles contains the '1' directory, then it's a
134-
# given that this sample-sheet contains replicates.
135-
self.has_replicates = True
136-
137-
# currently imported from Assay although it is a base method. it
138-
# could be imported into Workflows potentially, since it is a post-
139-
# processing step. All pairings of assay and instrument type need to
140-
# generate prep-info files in the same format.
141-
self.overwrite_prep_files(prep_paths)
142-
143-
# for now, simply re-run any line below as if it was a new job, even
144-
# for a restart. functionality is idempotent, except for the
145-
# registration of new preps in Qiita. These will simply be removed
146-
# manually.
147-
148-
# post-processing steps are by default associated with the Workflow
149-
# class, since they deal with fastq files and Qiita, and don't depend
150-
# on assay or instrument type.
151-
self.update_status("Generating sample information", 5, 9)
152-
self.sifs = self.generate_sifs()
153-
154-
# post-processing step.
155-
self.update_status("Registering blanks in Qiita", 6, 9)
156-
if self.update:
157-
self.update_blanks_in_qiita()
158-
159-
self.update_status("Loading preps into Qiita", 7, 9)
160-
if self.update:
161-
self.update_prep_templates()
162-
163-
# before we load preps into Qiita we need to copy the fastq
164-
# files n times for n preps and correct the file-paths each
165-
# prep is pointing to.
166-
self.load_preps_into_qiita()
167-
168-
self.update_status("Generating packaging commands", 8, 9)
169-
self.generate_commands()
170-
171-
self.update_status("Packaging results", 9, 9)
172-
if self.update:
173-
self.execute_commands()

0 commit comments

Comments
 (0)