Skip to content

Commit 2d0f041

Browse files
authored
fixes after deploy 03.25 (#100)
* fixes after deploy 03.25 * MultiQCJob/multiqc * flake8 * address @wasade comment * add try/except to test * ValueEror -> ValueError * merging determine_steps_to_skip * fix test
1 parent a5e9f04 commit 2d0f041

8 files changed

+175
-158
lines changed

qp_klp/Assays.py

+106-70
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44
from sequence_processing_pipeline.NuQCJob import NuQCJob
55
from sequence_processing_pipeline.FastQCJob import FastQCJob
66
from sequence_processing_pipeline.GenPrepFileJob import GenPrepFileJob
7+
from sequence_processing_pipeline.MultiQCJob import MultiQCJob
78
import pandas as pd
89
from json import dumps
910
from collections import defaultdict
11+
import re
1012

1113

1214
ASSAY_NAME_NONE = "Assay"
@@ -163,37 +165,6 @@ def post_process_raw_fastq_output(self):
163165
projects = [x['project_name'] for x in projects]
164166

165167
for project_name in projects:
166-
# copy the files from ConvertJob output to faked NuQCJob output
167-
# folder: $WKDIR/$RUN_ID/NuQCJob/$PROJ_NAME/amplicon
168-
output_folder = join(self.pipeline.output_path,
169-
'NuQCJob',
170-
project_name,
171-
# for legacy purposes, output folders are
172-
# either 'trimmed_sequences', 'amplicon', or
173-
# 'filtered_sequences'. Hence, this folder
174-
# is not defined using AMPLICON_TYPE as that
175-
# value may or may not equal the needed value.
176-
'amplicon')
177-
178-
makedirs(output_folder)
179-
180-
# get list of all raw output files to be copied.
181-
job_output = [join(self.raw_fastq_files_path, x) for x in
182-
listdir(self.raw_fastq_files_path)]
183-
184-
job_output = [x for x in job_output if isfile(x)]
185-
job_output = [x for x in job_output if x.endswith('fastq.gz')]
186-
187-
# NB: In this case, ensure the ONLY files that get copied are
188-
# Undetermined files, and this is what we expect for 16S runs.
189-
job_output = [x for x in job_output if
190-
basename(x).startswith('Undetermined')]
191-
192-
# copy the file
193-
for fastq_file in job_output:
194-
new_path = join(output_folder, basename(fastq_file))
195-
copyfile(fastq_file, new_path)
196-
197168
# FastQC expects the ConvertJob output to also be organized by
198169
# project. Since this would entail running the same ConvertJob
199170
# multiple times on the same input with just a name-change in
@@ -212,28 +183,66 @@ def post_process_raw_fastq_output(self):
212183
new_path = join(output_folder, basename(raw_fastq_file))
213184
copyfile(raw_fastq_file, new_path)
214185

186+
# copy the files from ConvertJob output to faked NuQCJob output
187+
# folder: $WKDIR/$RUN_ID/NuQCJob/$PROJ_NAME/amplicon
188+
output_folder = join(self.pipeline.output_path,
189+
'NuQCJob',
190+
project_name,
191+
# for legacy purposes, output folders are
192+
# either 'trimmed_sequences', 'amplicon', or
193+
# 'filtered_sequences'. Hence, this folder
194+
# is not defined using AMPLICON_TYPE as that
195+
# value may or may not equal the needed value.
196+
'amplicon')
197+
makedirs(output_folder)
198+
199+
# copy the file
200+
for fastq_file in job_output:
201+
new_path = join(output_folder, basename(fastq_file))
202+
copyfile(fastq_file, new_path)
203+
215204
def generate_reports(self):
216205
config = self.pipeline.get_software_configuration('fastqc')
217-
job = FastQCJob(self.pipeline.run_dir,
218-
self.pipeline.output_path,
219-
self.raw_fastq_files_path,
220-
join(self.pipeline.output_path, 'NuQCJob'),
221-
config['nprocs'],
222-
config['nthreads'],
223-
config['fastqc_executable_path'],
224-
config['modules_to_load'],
225-
self.master_qiita_job_id,
226-
config['queue'],
227-
config['nodes'],
228-
config['wallclock_time_in_minutes'],
229-
config['job_total_memory_limit'],
230-
config['job_pool_size'],
231-
config['multiqc_config_file_path'],
232-
config['job_max_array_length'],
233-
True)
206+
fcjob = FastQCJob(self.pipeline.run_dir,
207+
self.pipeline.output_path,
208+
self.raw_fastq_files_path,
209+
join(self.pipeline.output_path, 'NuQCJob'),
210+
config['nprocs'],
211+
config['nthreads'],
212+
config['fastqc_executable_path'],
213+
config['modules_to_load'],
214+
self.master_qiita_job_id,
215+
config['queue'],
216+
config['nodes'],
217+
config['wallclock_time_in_minutes'],
218+
config['job_total_memory_limit'],
219+
config['job_pool_size'],
220+
config['job_max_array_length'],
221+
True)
222+
mqcjob = MultiQCJob(self.pipeline.run_dir,
223+
self.pipeline.output_path,
224+
self.raw_fastq_files_path,
225+
join(self.pipeline.output_path, 'NuQCJob'),
226+
config['nprocs'],
227+
config['nthreads'],
228+
config['multiqc_executable_path'],
229+
config['modules_to_load'],
230+
self.master_qiita_job_id,
231+
config['queue'],
232+
config['nodes'],
233+
config['wallclock_time_in_minutes'],
234+
config['job_total_memory_limit'],
235+
config['job_pool_size'],
236+
join(self.pipeline.output_path, 'FastQCJob'),
237+
config['job_max_array_length'],
238+
config['multiqc_config_file_path'],
239+
True)
234240

235241
if 'FastQCJob' not in self.skip_steps:
236-
job.run(callback=self.job_callback)
242+
fcjob.run(callback=self.job_callback)
243+
244+
if 'MultiQCJob' not in self.skip_steps:
245+
mqcjob.run(callback=self.job_callback)
237246

238247
def generate_prep_file(self):
239248
config = self.pipeline.get_software_configuration('seqpro')
@@ -386,30 +395,49 @@ def quality_control(self):
386395

387396
def generate_reports(self):
388397
config = self.pipeline.get_software_configuration('fastqc')
389-
job = FastQCJob(self.pipeline.run_dir,
390-
self.pipeline.output_path,
391-
self.raw_fastq_files_path,
392-
join(self.pipeline.output_path, 'NuQCJob'),
393-
config['nprocs'],
394-
config['nthreads'],
395-
config['fastqc_executable_path'],
396-
config['modules_to_load'],
397-
self.master_qiita_job_id,
398-
config['queue'],
399-
config['nodes'],
400-
config['wallclock_time_in_minutes'],
401-
config['job_total_memory_limit'],
402-
config['job_pool_size'],
403-
config['multiqc_config_file_path'],
404-
config['job_max_array_length'],
405-
False)
398+
fqjob = FastQCJob(self.pipeline.run_dir,
399+
self.pipeline.output_path,
400+
self.raw_fastq_files_path,
401+
join(self.pipeline.output_path, 'NuQCJob'),
402+
config['nprocs'],
403+
config['nthreads'],
404+
config['fastqc_executable_path'],
405+
config['modules_to_load'],
406+
self.master_qiita_job_id,
407+
config['queue'],
408+
config['nodes'],
409+
config['wallclock_time_in_minutes'],
410+
config['job_total_memory_limit'],
411+
config['job_pool_size'],
412+
config['job_max_array_length'],
413+
False)
414+
mqcjob = MultiQCJob(self.pipeline.run_dir,
415+
self.pipeline.output_path,
416+
self.raw_fastq_files_path,
417+
join(self.pipeline.output_path, 'NuQCJob'),
418+
config['nprocs'],
419+
config['nthreads'],
420+
config['multiqc_executable_path'],
421+
config['modules_to_load'],
422+
self.master_qiita_job_id,
423+
config['queue'],
424+
config['nodes'],
425+
config['wallclock_time_in_minutes'],
426+
config['job_total_memory_limit'],
427+
config['job_pool_size'],
428+
join(self.pipeline.output_path, 'FastQCJob'),
429+
config['job_max_array_length'],
430+
config['multiqc_config_file_path'],
431+
False)
406432

407433
if 'FastQCJob' not in self.skip_steps:
408-
job.run(callback=self.job_callback)
434+
fqjob.run(callback=self.job_callback)
435+
if 'MultiQCJob' not in self.skip_steps:
436+
mqcjob.run(callback=self.job_callback)
409437

410-
failed_samples = job.audit(self.pipeline.get_sample_ids())
438+
failed_samples = fqjob.audit(self.pipeline.get_sample_ids())
411439
if hasattr(self, 'fsr'):
412-
self.fsr.write(failed_samples, job.__class__.__name__)
440+
self.fsr.write(failed_samples, fqjob.__class__.__name__)
413441
return failed_samples
414442

415443
def generate_prep_file(self):
@@ -534,12 +562,20 @@ def execute_pipeline(self):
534562
prep_paths = []
535563
self.prep_file_paths = {}
536564

565+
rematch = re.compile(
566+
r"(?P<runid>[a-zA-Z0-9_-]+)\.(?P<qname>[a-zA-Z0-9_]+)"
567+
r"(?P<qid>[0-9]{5,6})\..\.tsv")
568+
537569
for root, dirs, files in walk(tmp):
538570
for _file in files:
539571
# breakup the prep-info-file into segments
540572
# (run-id, project_qid, other) and cleave
541573
# the qiita-id from the project_name.
542-
qid = _file.split('.')[1].split('_')[-1]
574+
rer = rematch.match(_file)
575+
if rer is None:
576+
continue
577+
578+
_, _, qid = rer.groups()
543579

544580
if qid not in self.prep_file_paths:
545581
self.prep_file_paths[qid] = []

qp_klp/Protocol.py

+6
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,12 @@ def get_config(command):
9494

9595
return failed_samples
9696

97+
def integrate_results(self):
98+
pass
99+
100+
def generate_sequence_counts(self):
101+
pass
102+
97103

98104
class TellSeq(Protocol):
99105
protocol_type = PROTOCOL_NAME_TELLSEQ

qp_klp/StandardAmpliconWorkflow.py

+10-22
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
from .Protocol import Illumina
2-
from os.path import join, abspath, exists
2+
from os.path import join, abspath
33
from os import walk
4-
from shutil import rmtree
54
from sequence_processing_pipeline.Pipeline import Pipeline
65
from .Assays import Amplicon
76
from .Assays import ASSAY_NAME_AMPLICON
87
from .Workflows import Workflow
8+
import re
99

1010

1111
class StandardAmpliconWorkflow(Workflow, Amplicon, Illumina):
@@ -53,25 +53,6 @@ def __init__(self, **kwargs):
5353

5454
self.update = kwargs['update_qiita']
5555

56-
def determine_steps_to_skip(self):
57-
out_dir = self.pipeline.output_path
58-
59-
# Although amplicon runs don't perform host-filtering,
60-
# the output from ConvertJob is still copied and organized into
61-
# a form suitable for FastQCJob to process. Hence the presence or
62-
# absence of a 'NuQCJob' directory is still a thing (for now)
63-
directories_to_check = ['ConvertJob', 'NuQCJob',
64-
'FastQCJob', 'GenPrepFileJob']
65-
66-
for directory in directories_to_check:
67-
if exists(join(out_dir, directory)):
68-
if exists(join(out_dir, directory, 'job_completed')):
69-
# this step completed successfully.
70-
self.skip_steps.append(directory)
71-
else:
72-
# work stopped before this job could be completed.
73-
rmtree(join(out_dir, directory))
74-
7556
def execute_pipeline(self):
7657
'''
7758
Executes steps of pipeline in proper sequence.
@@ -124,13 +105,20 @@ def execute_pipeline(self):
124105

125106
prep_paths = []
126107
self.prep_file_paths = {}
108+
rematch = re.compile(
109+
r"(?P<runid>[a-zA-Z0-9_-]+)\.(?P<qname>[a-zA-Z0-9_]+)"
110+
r"(?P<qid>[0-9]{5,6})\..\.tsv")
127111

128112
for root, dirs, files in walk(tmp):
129113
for _file in files:
130114
# breakup the prep-info-file into segments
131115
# (run-id, project_qid, other) and cleave
132116
# the qiita-id from the project_name.
133-
qid = _file.split('.')[1].split('_')[-1]
117+
rer = rematch.match(_file)
118+
if rer is None:
119+
continue
120+
121+
_, _, qid = rer.groups()
134122

135123
if qid not in self.prep_file_paths:
136124
self.prep_file_paths[qid] = []

qp_klp/StandardMetagenomicWorkflow.py

-17
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
from .Protocol import Illumina
2-
from os.path import join, exists
3-
from shutil import rmtree
42
from sequence_processing_pipeline.Pipeline import Pipeline
53
from .Assays import Metagenomic
64
from .Assays import ASSAY_NAME_METAGENOMIC
@@ -51,18 +49,3 @@ def __init__(self, **kwargs):
5149
"type bool")
5250

5351
self.update = kwargs['update_qiita']
54-
55-
def determine_steps_to_skip(self):
56-
out_dir = self.pipeline.output_path
57-
58-
directories_to_check = ['ConvertJob', 'NuQCJob',
59-
'FastQCJob', 'GenPrepFileJob']
60-
61-
for directory in directories_to_check:
62-
if exists(join(out_dir, directory)):
63-
if exists(join(out_dir, directory, 'job_completed')):
64-
# this step completed successfully.
65-
self.skip_steps.append(directory)
66-
else:
67-
# work stopped before this job could be completed.
68-
rmtree(join(out_dir, directory))

qp_klp/StandardMetatranscriptomicWorkflow.py

-17
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
from .Protocol import Illumina
2-
from os.path import join, exists
3-
from shutil import rmtree
42
from sequence_processing_pipeline.Pipeline import Pipeline
53
from .Assays import Metatranscriptomic
64
from .Assays import ASSAY_NAME_METATRANSCRIPTOMIC
@@ -52,18 +50,3 @@ def __init__(self, **kwargs):
5250
"type bool")
5351

5452
self.update = kwargs['update_qiita']
55-
56-
def determine_steps_to_skip(self):
57-
out_dir = self.pipeline.output_path
58-
59-
directories_to_check = ['ConvertJob', 'NuQCJob',
60-
'FastQCJob', 'GenPrepFileJob']
61-
62-
for directory in directories_to_check:
63-
if exists(join(out_dir, directory)):
64-
if exists(join(out_dir, directory, 'job_completed')):
65-
# this step completed successfully.
66-
self.skip_steps.append(directory)
67-
else:
68-
# work stopped before this job could be completed.
69-
rmtree(join(out_dir, directory))

qp_klp/TellseqMetagenomicWorkflow.py

+4-21
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
from .Protocol import TellSeq
2-
from os.path import join, exists
32
from sequence_processing_pipeline.Pipeline import Pipeline, InstrumentUtils
43
from .Assays import Metagenomic
54
from .Assays import ASSAY_NAME_METAGENOMIC
@@ -44,6 +43,10 @@ def __init__(self, **kwargs):
4443
self.lane_number = self.kwargs['lane_number']
4544
self.is_restart = bool(self.kwargs['is_restart'])
4645

46+
self.directories_to_check = [
47+
'TellReadJob', 'TRIntegrateJob', 'NuQCJob', 'FastQCJob',
48+
'SeqCountsJob', 'GenPrepFileJob']
49+
4750
if self.is_restart is True:
4851
self.determine_steps_to_skip()
4952

@@ -55,23 +58,3 @@ def __init__(self, **kwargs):
5558
"type bool")
5659

5760
self.update = kwargs['update_qiita']
58-
59-
def determine_steps_to_skip(self):
60-
out_dir = self.pipeline.output_path
61-
62-
directories_to_check = ['TellReadJob', 'TRIntegrateJob', 'NuQCJob',
63-
'FastQCJob', 'SeqCountsJob', 'GenPrepFileJob']
64-
65-
for directory in directories_to_check:
66-
if exists(join(out_dir, directory)):
67-
if exists(join(out_dir, directory, 'job_completed')):
68-
# this step completed successfully.
69-
self.skip_steps.append(directory)
70-
if exists(join(out_dir, directory,
71-
'post_processing_completed')):
72-
self.skip_steps.append('TRIJ_Post_Processing')
73-
else:
74-
# work stopped before this job could be completed.
75-
msg = "%s doesn't have job completed" % join(out_dir,
76-
directory)
77-
raise ValueError(msg)

0 commit comments

Comments
 (0)