Skip to content

rm duplicated execute_pipeline #104

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 129 additions & 104 deletions qp_klp/Assays.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,130 @@ def _generate_artifact_name(self, prep_file_path):
# this is a normal pre-prep or sample-sheet.
return (a_name, False)

def execute_pipeline(self):
'''
Executes steps of pipeline in proper sequence.
:return: None
'''
if not self.is_restart:
self.pre_check()

# this is performed even in the event of a restart.
self.generate_special_map()

# even if a job is being skipped, it's being skipped because it was
# determined that it already completed successfully. Hence,
# increment the status because we are still iterating through them.

self.update_status("Converting data", 1, 9)
if "ConvertJob" not in self.skip_steps:
# converting raw data to fastq depends heavily on the instrument
# used to generate the run_directory. Hence this method is
# supplied by the instrument mixin.
self.convert_raw_to_fastq()
self.post_process_raw_fastq_output()
self.integrate_results()
self.generate_sequence_counts()

self.update_status("Performing quality control", 2, 9)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to add a bit of additional commenting for this section? My understanding is that this PR is merging the previous Amplicon execute_pipeline method with the StandardAmpliconWorkflow execute_pipeline method, both of which are for (duh) Amplicon processing, right? But now we are moving the method up into the Assay class, which is the parent of both Amplicon and MetaOmic. I assume I am not reading the codebase right, because it looks to me like the post_process_raw_fastq_output() method only exists in Amplicon and the quality_control method only exists in MetaOmic. What is going to happen when an Amplicon assay invokes this method and gets to the quality_control() call, but doesn't have a mixin providing that method?

if "NuQCJob" not in self.skip_steps:
# there is no failed samples reporting for amplicon runs.
self.quality_control()

self.update_status("Generating reports", 3, 9)
if "FastQCJob" not in self.skip_steps:
# reports are currently implemented by the assay mixin. This is
# only because metagenomic runs currently require a failed-samples
# report to be generated. This is not done for amplicon runs since
# demultiplexing occurs downstream of SPP.
self.generate_reports()

self.update_status("Generating preps", 4, 9)
if "GenPrepFileJob" not in self.skip_steps:
# preps are currently associated with array mixin, but only
# because there are currently some slight differences in how
# FastQCJob gets instantiated(). This could get moved into a

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little confused by the mention of FastQCJob here since I thought we handled that at line 178 ... is this some sort of downstream consequence of that stuff?

# shared method, but probably still in Assay.
self.generate_prep_file()
else:
# if GenPrepFileJob is not run, then we need to find the

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this is a pretty large change in logic from both the previous versions of this code, right? In both the previous versions, this big block of code that populates the public self.prep_file_paths property was being performed in all cases, not just when GenPrepFileJob was being skipped or generate_prep_file wasn't being called. What has changed that makes it no longer necessary to explicitly populate this property every time?

# preps that were previously generated
tmp = join(self.pipeline.output_path,
'GenPrepFileJob', 'PrepFiles')

self.has_replicates = False

prep_paths = []
self.prep_file_paths = {}
rematch = re.compile(
r"(?P<runid>[a-zA-Z0-9_-]+)\.(?P<qname>[a-zA-Z0-9_-]+)"
r"(?P<qid>[0-9]{5,6})\..\.tsv")

for root, dirs, files in walk(tmp):
for _file in files:
# breakup the prep-info-file into segments
# (run-id, project_qid, other) and cleave
# the qiita-id from the project_name.
rer = rematch.match(_file)
if rer is None:
continue

_, _, qid = rer.groups()

if qid not in self.prep_file_paths:
self.prep_file_paths[qid] = []

_path = abspath(join(root, _file))
if _path.endswith('.tsv'):
prep_paths.append(_path)
self.prep_file_paths[qid].append(_path)

for _dir in dirs:
if _dir == '1':
# if PrepFiles contains the '1' directory, then it's a
# given that this sample-sheet contains replicates.
self.has_replicates = True

# currently imported from Assay although it is a base method. it
# could be imported into Workflows potentially, since it is a post-
# processing step. All pairings of assay and instrument type need to
# generate prep-info files in the same format.
self.overwrite_prep_files(prep_paths)

# for now, simply re-run any line below as if it was a new job, even
# for a restart. functionality is idempotent, except for the
# registration of new preps in Qiita. These will simply be removed
# manually.

# post-processing steps are by default associated with the Workflow
# class, since they deal with fastq files and Qiita, and don't depend
# on assay or instrument type.
self.update_status("Generating sample information", 5, 9)
self.sifs = self.generate_sifs()

# post-processing step.
self.update_status("Registering blanks in Qiita", 6, 9)
if self.update:
self.update_blanks_in_qiita()

self.update_status("Loading preps into Qiita", 7, 9)
if self.update:
self.update_prep_templates()

# before we load preps into Qiita we need to copy the fastq
# files n times for n preps and correct the file-paths each
# prep is pointing to.
self.load_preps_into_qiita()

self.fsr.generate_report()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we toss this line a comment, too? I see that it was in Amplicon.execute_pipeline but not in StandardAmpliconWorkflow.execute_pipeline, so now I'm curious about why ... :D


self.update_status("Generating packaging commands", 8, 9)
self.generate_commands()

self.update_status("Packaging results", 9, 9)
if self.update:
self.execute_commands()


class Amplicon(Assay):
AMPLICON_TYPE = 'Amplicon'
Expand Down Expand Up @@ -344,6 +468,9 @@ def load_preps_into_qiita(self):

return df

def quality_control(self):
pass


class MetaOmic(Assay):
"""
Expand Down Expand Up @@ -521,110 +648,8 @@ def load_preps_into_qiita(self):

return df

def execute_pipeline(self):
'''
Executes steps of pipeline in proper sequence.
:return: None
'''
self.pre_check()

self.generate_special_map()

self.update_status("Converting data", 1, 9)

self.convert_raw_to_fastq()

self.integrate_results()

self.generate_sequence_counts()

self.update_status("Performing quality control", 2, 9)
self.quality_control()

self.update_status("Generating reports", 3, 9)
self.generate_reports()

self.update_status("Generating preps", 4, 9)
self.generate_prep_file()

# moved final component of genprepfilejob outside of object.
# obtain the paths to the prep-files generated by GenPrepFileJob
# w/out having to recover full state.
tmp = join(self.pipeline.output_path, 'GenPrepFileJob', 'PrepFiles')

self.has_replicates = False

prep_paths = []
self.prep_file_paths = {}

rematch = re.compile(
r"(?P<runid>[a-zA-Z0-9_-]+)\.(?P<qname>[a-zA-Z0-9_]+)"
r"(?P<qid>[0-9]{5,6})\..\.tsv")

for root, dirs, files in walk(tmp):
for _file in files:
# breakup the prep-info-file into segments
# (run-id, project_qid, other) and cleave
# the qiita-id from the project_name.
rer = rematch.match(_file)
if rer is None:
continue

_, _, qid = rer.groups()

if qid not in self.prep_file_paths:
self.prep_file_paths[qid] = []

_path = abspath(join(root, _file))
if _path.endswith('.tsv'):
prep_paths.append(_path)
self.prep_file_paths[qid].append(_path)

for _dir in dirs:
if _dir == '1':
# if PrepFiles contains the '1' directory, then it's a
# given that this sample-sheet contains replicates.
self.has_replicates = True

# currently imported from Assay although it is a base method. it
# could be imported into Workflows potentially, since it is a post-
# processing step. All pairings of assay and instrument type need to
# generate prep-info files in the same format.
self.overwrite_prep_files(prep_paths)

# for now, simply re-run any line below as if it was a new job, even
# for a restart. functionality is idempotent, except for the
# registration of new preps in Qiita. These will simply be removed
# manually.

# post-processing steps are by default associated with the Workflow
# class, since they deal with fastq files and Qiita, and don't depend
# on assay or instrument type.
self.update_status("Generating sample information", 5, 9)
self.sifs = self.generate_sifs()

# post-processing step.
self.update_status("Registering blanks in Qiita", 6, 9)
if self.update:
self.update_blanks_in_qiita()

self.update_status("Loading preps into Qiita", 7, 9)
if self.update:
self.update_prep_templates()

# before we load preps into Qiita we need to copy the fastq
# files n times for n preps and correct the file-paths each
# prep is pointing to.
self.load_preps_into_qiita()

self.fsr.generate_report()

self.update_status("Generating packaging commands", 8, 9)
self.generate_commands()

self.update_status("Packaging results", 9, 9)
if self.update:
self.execute_commands()
def post_process_raw_fastq_output(self):
pass


class Metagenomic(MetaOmic):
Expand Down
122 changes: 0 additions & 122 deletions qp_klp/StandardAmpliconWorkflow.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
from .Protocol import Illumina
from os.path import join, abspath
from os import walk
from sequence_processing_pipeline.Pipeline import Pipeline
from .Assays import Amplicon
from .Assays import ASSAY_NAME_AMPLICON
from .Workflows import Workflow
import re


class StandardAmpliconWorkflow(Workflow, Amplicon, Illumina):
Expand Down Expand Up @@ -52,122 +49,3 @@ def __init__(self, **kwargs):
"type bool")

self.update = kwargs['update_qiita']

def execute_pipeline(self):
'''
Executes steps of pipeline in proper sequence.
:return: None
'''
if not self.is_restart:
self.pre_check()

# this is performed even in the event of a restart.
self.generate_special_map()

# even if a job is being skipped, it's being skipped because it was
# determined that it already completed successfully. Hence,
# increment the status because we are still iterating through them.

self.update_status("Converting data", 1, 9)
if "ConvertJob" not in self.skip_steps:
# converting raw data to fastq depends heavily on the instrument
# used to generate the run_directory. Hence this method is
# supplied by the instrument mixin.
self.convert_raw_to_fastq()

self.update_status("Post-processing raw fasq output", 2, 9)
if "NuQCJob" not in self.skip_steps:
# there is no failed samples reporting for amplicon runs.
self.post_process_raw_fastq_output()

self.update_status("Generating reports", 3, 9)
if "FastQCJob" not in self.skip_steps:
# reports are currently implemented by the assay mixin. This is
# only because metagenomic runs currently require a failed-samples
# report to be generated. This is not done for amplicon runs since
# demultiplexing occurs downstream of SPP.
self.generate_reports()

self.update_status("Generating preps", 4, 9)
if "GenPrepFileJob" not in self.skip_steps:
# preps are currently associated with array mixin, but only
# because there are currently some slight differences in how
# FastQCJob gets instantiated(). This could get moved into a
# shared method, but probably still in Assay.
self.generate_prep_file()

# moved final component of genprepfilejob outside of object.
# obtain the paths to the prep-files generated by GenPrepFileJob
# w/out having to recover full state.
tmp = join(self.pipeline.output_path, 'GenPrepFileJob', 'PrepFiles')

self.has_replicates = False

prep_paths = []
self.prep_file_paths = {}
rematch = re.compile(
r"(?P<runid>[a-zA-Z0-9_-]+)\.(?P<qname>[a-zA-Z0-9_]+)"
r"(?P<qid>[0-9]{5,6})\..\.tsv")

for root, dirs, files in walk(tmp):
for _file in files:
# breakup the prep-info-file into segments
# (run-id, project_qid, other) and cleave
# the qiita-id from the project_name.
rer = rematch.match(_file)
if rer is None:
continue

_, _, qid = rer.groups()

if qid not in self.prep_file_paths:
self.prep_file_paths[qid] = []

_path = abspath(join(root, _file))
if _path.endswith('.tsv'):
prep_paths.append(_path)
self.prep_file_paths[qid].append(_path)

for _dir in dirs:
if _dir == '1':
# if PrepFiles contains the '1' directory, then it's a
# given that this sample-sheet contains replicates.
self.has_replicates = True

# currently imported from Assay although it is a base method. it
# could be imported into Workflows potentially, since it is a post-
# processing step. All pairings of assay and instrument type need to
# generate prep-info files in the same format.
self.overwrite_prep_files(prep_paths)

# for now, simply re-run any line below as if it was a new job, even
# for a restart. functionality is idempotent, except for the
# registration of new preps in Qiita. These will simply be removed
# manually.

# post-processing steps are by default associated with the Workflow
# class, since they deal with fastq files and Qiita, and don't depend
# on assay or instrument type.
self.update_status("Generating sample information", 5, 9)
self.sifs = self.generate_sifs()

# post-processing step.
self.update_status("Registering blanks in Qiita", 6, 9)
if self.update:
self.update_blanks_in_qiita()

self.update_status("Loading preps into Qiita", 7, 9)
if self.update:
self.update_prep_templates()

# before we load preps into Qiita we need to copy the fastq
# files n times for n preps and correct the file-paths each
# prep is pointing to.
self.load_preps_into_qiita()

self.update_status("Generating packaging commands", 8, 9)
self.generate_commands()

self.update_status("Packaging results", 9, 9)
if self.update:
self.execute_commands()