Skip to content

test full runs in WorkflowFactory.py #131

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 34 commits into from
May 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
c66c80e
inserting data to qiita for testing
antgonza Apr 22, 2025
3443a82
testing creation of study in qiit
antgonza Apr 22, 2025
1ea19c6
add sbatch
antgonza Apr 22, 2025
c505773
shopt
antgonza Apr 22, 2025
db3258d
echo .profile
antgonza Apr 22, 2025
23d428d
slurm
antgonza Apr 23, 2025
026da91
partition
antgonza Apr 23, 2025
e0bb95c
scontrol create partition
antgonza Apr 23, 2025
cff7adb
mv scontrol
antgonza Apr 23, 2025
7b71a72
partitionname
antgonza Apr 23, 2025
a886761
sudo scontrol
antgonza Apr 23, 2025
fb190ab
add some prints
antgonza Apr 23, 2025
e2d236b
/usr/bin/sbatch
antgonza Apr 23, 2025
980d35e
sudo
antgonza Apr 23, 2025
1ce4e3d
env
antgonza Apr 23, 2025
ea9b962
.local/bin/sbatch
antgonza Apr 23, 2025
e77048c
ls
antgonza Apr 23, 2025
2faf214
sbatch in conda
antgonza Apr 23, 2025
2af7794
squeue
antgonza Apr 23, 2025
43e0395
improve error display and running tests
antgonza Apr 24, 2025
6c32c70
sbatch
antgonza Apr 24, 2025
15a84b0
GITHUB_PATH
antgonza Apr 24, 2025
88e3b70
adding files to tests/bin
antgonza Apr 24, 2025
dc5b1d0
test_metagenomic_workflow_creation
antgonza Apr 30, 2025
3842dc6
adding _inject_data
antgonza Apr 30, 2025
68ecebc
merging main
antgonza May 1, 2025
5d89746
fixing some tests
antgonza May 5, 2025
9d34625
fixing other tests
antgonza May 5, 2025
b368754
copyfile -> touch()
antgonza May 5, 2025
62f3ab8
more copyfile -> touch()
antgonza May 5, 2025
c0d2820
fix test_tellseq_workflow_creation
antgonza May 5, 2025
3ee3e49
addressing @AmandaBirmingham comments
antgonza May 6, 2025
7f2b47a
flake8
antgonza May 6, 2025
110aeed
addressing @AmandaBirmingham note about comments
antgonza May 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 122 additions & 108 deletions src/qp_klp/Assays.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import pandas as pd
from json import dumps
from collections import defaultdict
import re


ASSAY_NAME_NONE = "Assay"
Expand Down Expand Up @@ -144,13 +143,133 @@ def _generate_artifact_name(self, prep_file_path):
# this is a normal pre-prep or sample-sheet.
return (a_name, False)

def execute_pipeline(self):
'''
Executes steps of pipeline in proper sequence.
:return: None
'''
if not self.is_restart:
self.pre_check()

# this is performed even in the event of a restart.
self.generate_special_map()

# even if a job is being skipped, it's being skipped because it was
# determined that it already completed successfully. Hence,
# increment the status because we are still iterating through them.

self.update_status("Converting data", 1, 9)
if "ConvertJob" not in self.skip_steps:
# converting raw data to fastq depends heavily on the instrument
# used to generate the run_directory. Hence this method is
# supplied by the instrument mixin.
self.convert_raw_to_fastq()
self.integrate_results()
self.generate_sequence_counts()

self.update_status("QC-ing reads", 2, 9)
if "NuQCJob" not in self.skip_steps:
self.qc_reads()

self.update_status("Generating reports", 3, 9)
if "FastQCJob" not in self.skip_steps:
# reports are currently implemented by the assay mixin. This is
# only because metagenomic runs currently require a failed-samples
# report to be generated. This is not done for amplicon runs since
# demultiplexing occurs downstream of SPP.
self.generate_reports()

self.update_status("Generating preps", 4, 9)
if "GenPrepFileJob" not in self.skip_steps:
self.generate_prep_file()

# moved final component of genprepfilejob outside of object.
# obtain the paths to the prep-files generated by GenPrepFileJob
# w/out having to recover full state.
tmp = join(self.pipeline.output_path, 'GenPrepFileJob', 'PrepFiles')

self.has_replicates = False

prep_paths = []
self.prep_file_paths = {}

for root, dirs, files in walk(tmp):
for _file in files:
# we are looing for .tsv files and we are only interested
# in the string after the last _, which is the study_id
if not _file.endswith('.tsv'):
continue
# continue if no underscore
chunks = _file.rsplit('_', 1)
if len(chunks) <= 1:
continue
# continue if no int after .
qid = chunks[-1].split('.')[0]
if not qid.isnumeric():
continue
if qid not in self.prep_file_paths:
self.prep_file_paths[qid] = []

_path = abspath(join(root, _file))
prep_paths.append(_path)
self.prep_file_paths[qid].append(_path)

for _dir in dirs:
if _dir == '1':
# if PrepFiles contains the '1' directory, then it's a
# given that this sample-sheet contains replicates.
self.has_replicates = True

# currently imported from Assay although it is a base method. it
# could be imported into Workflows potentially, since it is a post-
# processing step. All pairings of assay and instrument type need to
# generate prep-info files in the same format.
self.overwrite_prep_files(prep_paths)

# for now, simply re-run any line below as if it was a new job, even
# for a restart. functionality is idempotent, except for the
# registration of new preps in Qiita. These will simply be removed
# manually.

# post-processing steps are by default associated with the Workflow
# class, since they deal with fastq files and Qiita, and don't depend
# on assay or instrument type.
self.update_status("Generating sample information", 5, 9)
self.sifs = self.generate_sifs()

# post-processing step.
self.update_status("Registering blanks in Qiita", 6, 9)
if self.update:
self.update_blanks_in_qiita()

self.update_status("Loading preps into Qiita", 7, 9)
if self.update:
self.update_prep_templates()

# before we load preps into Qiita we need to copy the fastq
# files n times for n preps and correct the file-paths each
# prep is pointing to.
self.load_preps_into_qiita()

# before we pack the results, we need to generate the human-readable
# report of samples lost in each step. The tracking is being done
# within fsr (FailedSamplesRecord), in conjuction with Job.audit.
self.fsr.generate_report()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we toss this line a comment, too? I see that it was in MetaOmic.execute_pipeline but not in StandardAmpliconWorkflow.execute_pipeline, so now I'm curious about why ... :D. I also notice that at one point in MetaOmic the code checks if hasattr(self, 'fsr'): before calling self.fsr.<something>() ... do we need to worry that not every assay type or every individual assay instance will have an fsr property?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fsr is an instance of FailedSamplesRecord, which is its own object, and as far as I can tell is used with Job.audit to keep track of the samples lost on each of the steps of the pipeline. Here we call FailedSamplesRecord. generate_report to output the report so it's moved to the final output and the user can get access. Adding this info in the code.


self.update_status("Generating packaging commands", 8, 9)
self.generate_commands()

self.update_status("Packaging results", 9, 9)
if self.update:
self.execute_commands()


class Amplicon(Assay):
AMPLICON_TYPE = 'Amplicon'
AMPLICON_SUB_TYPES = {'16S', '18S', 'ITS'}
assay_type = ASSAY_NAME_AMPLICON

def post_process_raw_fastq_output(self):
def qc_reads(self):
"""
Post-process ConvertJob output into correct form for FastQCJob.
"""
Expand Down Expand Up @@ -353,7 +472,7 @@ class MetaOmic(Assay):
# MetaOmic does not have an assay_type of its own. It is defined by its
# children.

def quality_control(self):
def qc_reads(self):
# because this is a mixin, assume containing object will contain
# a pipeline object.
config = self.pipeline.get_software_configuration('nu-qc')
Expand Down Expand Up @@ -521,111 +640,6 @@ def load_preps_into_qiita(self):

return df

def execute_pipeline(self):
'''
Executes steps of pipeline in proper sequence.
:return: None
'''
self.pre_check()

self.generate_special_map()

self.update_status("Converting data", 1, 9)

self.convert_raw_to_fastq()

self.integrate_results()

self.generate_sequence_counts()

self.update_status("Performing quality control", 2, 9)
self.quality_control()

self.update_status("Generating reports", 3, 9)
self.generate_reports()

self.update_status("Generating preps", 4, 9)
self.generate_prep_file()

# moved final component of genprepfilejob outside of object.
# obtain the paths to the prep-files generated by GenPrepFileJob
# w/out having to recover full state.
tmp = join(self.pipeline.output_path, 'GenPrepFileJob', 'PrepFiles')

self.has_replicates = False

prep_paths = []
self.prep_file_paths = {}

rematch = re.compile(
r"(?P<runid>[a-zA-Z0-9_-]+)\.(?P<qname>[a-zA-Z0-9_]+)"
r"(?P<qid>[0-9]{5,6})\..\.tsv")

for root, dirs, files in walk(tmp):
for _file in files:
# breakup the prep-info-file into segments
# (run-id, project_qid, other) and cleave
# the qiita-id from the project_name.
rer = rematch.match(_file)
if rer is None:
continue

_, _, qid = rer.groups()

if qid not in self.prep_file_paths:
self.prep_file_paths[qid] = []

_path = abspath(join(root, _file))
if _path.endswith('.tsv'):
prep_paths.append(_path)
self.prep_file_paths[qid].append(_path)

for _dir in dirs:
if _dir == '1':
# if PrepFiles contains the '1' directory, then it's a
# given that this sample-sheet contains replicates.
self.has_replicates = True

# currently imported from Assay although it is a base method. it
# could be imported into Workflows potentially, since it is a post-
# processing step. All pairings of assay and instrument type need to
# generate prep-info files in the same format.
self.overwrite_prep_files(prep_paths)

# for now, simply re-run any line below as if it was a new job, even
# for a restart. functionality is idempotent, except for the
# registration of new preps in Qiita. These will simply be removed
# manually.

# post-processing steps are by default associated with the Workflow
# class, since they deal with fastq files and Qiita, and don't depend
# on assay or instrument type.
self.update_status("Generating sample information", 5, 9)
self.sifs = self.generate_sifs()

# post-processing step.
self.update_status("Registering blanks in Qiita", 6, 9)
if self.update:
self.update_blanks_in_qiita()

self.update_status("Loading preps into Qiita", 7, 9)
if self.update:
self.update_prep_templates()

# before we load preps into Qiita we need to copy the fastq
# files n times for n preps and correct the file-paths each
# prep is pointing to.
self.load_preps_into_qiita()

self.fsr.generate_report()

self.update_status("Generating packaging commands", 8, 9)
self.generate_commands()

self.update_status("Packaging results", 9, 9)
if self.update:
self.execute_commands()


class Metagenomic(MetaOmic):
METAGENOMIC_TYPE = 'Metagenomic'
Expand Down
122 changes: 0 additions & 122 deletions src/qp_klp/StandardAmpliconWorkflow.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
from .Protocol import Illumina
from os.path import join, abspath
from os import walk
from sequence_processing_pipeline.Pipeline import Pipeline
from .Assays import Amplicon
from .Assays import ASSAY_NAME_AMPLICON
from .Workflows import Workflow
import re


class StandardAmpliconWorkflow(Workflow, Amplicon, Illumina):
Expand Down Expand Up @@ -52,122 +49,3 @@ def __init__(self, **kwargs):
"type bool")

self.update = kwargs['update_qiita']

def execute_pipeline(self):
'''
Executes steps of pipeline in proper sequence.
:return: None
'''
if not self.is_restart:
self.pre_check()

# this is performed even in the event of a restart.
self.generate_special_map()

# even if a job is being skipped, it's being skipped because it was
# determined that it already completed successfully. Hence,
# increment the status because we are still iterating through them.

self.update_status("Converting data", 1, 9)
if "ConvertJob" not in self.skip_steps:
# converting raw data to fastq depends heavily on the instrument
# used to generate the run_directory. Hence this method is
# supplied by the instrument mixin.
self.convert_raw_to_fastq()

self.update_status("Post-processing raw fasq output", 2, 9)
if "NuQCJob" not in self.skip_steps:
# there is no failed samples reporting for amplicon runs.
self.post_process_raw_fastq_output()

self.update_status("Generating reports", 3, 9)
if "FastQCJob" not in self.skip_steps:
# reports are currently implemented by the assay mixin. This is
# only because metagenomic runs currently require a failed-samples
# report to be generated. This is not done for amplicon runs since
# demultiplexing occurs downstream of SPP.
self.generate_reports()

self.update_status("Generating preps", 4, 9)
if "GenPrepFileJob" not in self.skip_steps:
# preps are currently associated with array mixin, but only
# because there are currently some slight differences in how
# FastQCJob gets instantiated(). This could get moved into a
# shared method, but probably still in Assay.
self.generate_prep_file()

# moved final component of genprepfilejob outside of object.
# obtain the paths to the prep-files generated by GenPrepFileJob
# w/out having to recover full state.
tmp = join(self.pipeline.output_path, 'GenPrepFileJob', 'PrepFiles')

self.has_replicates = False

prep_paths = []
self.prep_file_paths = {}
rematch = re.compile(
r"(?P<runid>[a-zA-Z0-9_-]+)\.(?P<qname>[a-zA-Z0-9_]+)"
r"(?P<qid>[0-9]{5,6})\..\.tsv")

for root, dirs, files in walk(tmp):
for _file in files:
# breakup the prep-info-file into segments
# (run-id, project_qid, other) and cleave
# the qiita-id from the project_name.
rer = rematch.match(_file)
if rer is None:
continue

_, _, qid = rer.groups()

if qid not in self.prep_file_paths:
self.prep_file_paths[qid] = []

_path = abspath(join(root, _file))
if _path.endswith('.tsv'):
prep_paths.append(_path)
self.prep_file_paths[qid].append(_path)

for _dir in dirs:
if _dir == '1':
# if PrepFiles contains the '1' directory, then it's a
# given that this sample-sheet contains replicates.
self.has_replicates = True

# currently imported from Assay although it is a base method. it
# could be imported into Workflows potentially, since it is a post-
# processing step. All pairings of assay and instrument type need to
# generate prep-info files in the same format.
self.overwrite_prep_files(prep_paths)

# for now, simply re-run any line below as if it was a new job, even
# for a restart. functionality is idempotent, except for the
# registration of new preps in Qiita. These will simply be removed
# manually.

# post-processing steps are by default associated with the Workflow
# class, since they deal with fastq files and Qiita, and don't depend
# on assay or instrument type.
self.update_status("Generating sample information", 5, 9)
self.sifs = self.generate_sifs()

# post-processing step.
self.update_status("Registering blanks in Qiita", 6, 9)
if self.update:
self.update_blanks_in_qiita()

self.update_status("Loading preps into Qiita", 7, 9)
if self.update:
self.update_prep_templates()

# before we load preps into Qiita we need to copy the fastq
# files n times for n preps and correct the file-paths each
# prep is pointing to.
self.load_preps_into_qiita()

self.update_status("Generating packaging commands", 8, 9)
self.generate_commands()

self.update_status("Packaging results", 9, 9)
if self.update:
self.execute_commands()
Loading