From 99e627b8b0eaa9abaf87d4306036af872692cafe Mon Sep 17 00:00:00 2001 From: Charles Cowart <42684307+charles-cowart@users.noreply.github.com> Date: Wed, 26 Jun 2024 05:30:44 -0700 Subject: [PATCH] Updates based on initial runs in production. (#89) * Updates based on initial runs in production. * Updates from production. tests updated. --- qp_klp/Step.py | 62 +++++++++++++------ qp_klp/klp.py | 2 + .../iseq_metagenomic.json | 2 +- .../novaseq_amplicon.json | 2 +- .../novaseq_metatranscriptomic.json | 2 +- qp_klp/tests/data/process_all_fastq_files.sh | 50 +++++++-------- 6 files changed, 73 insertions(+), 47 deletions(-) diff --git a/qp_klp/Step.py b/qp_klp/Step.py index b0645b6..326c572 100644 --- a/qp_klp/Step.py +++ b/qp_klp/Step.py @@ -1,9 +1,8 @@ -from itertools import chain from collections import defaultdict from json import dumps, load from metapool import load_sample_sheet from os import makedirs, walk, listdir -from os.path import join, exists, split, basename, dirname +from os.path import join, exists, split, basename, dirname, abspath from sequence_processing_pipeline.ConvertJob import ConvertJob from sequence_processing_pipeline.FastQCJob import FastQCJob from sequence_processing_pipeline.GenPrepFileJob import GenPrepFileJob @@ -380,11 +379,6 @@ def _generate_prep_file(self, config, input_file_path, seqpro_path): gpf_job.run(callback=self.update_callback) - # concatenate the lists of paths across all study_ids into a single - # list. Replace sample-names w/tube-ids in all relevant prep-files. - preps = list(chain.from_iterable(gpf_job.prep_file_paths.values())) - self._overwrite_prep_files(preps) - return gpf_job def _helper_process_fastp_report_dirs(self): @@ -727,6 +721,7 @@ def _get_tube_ids_from_qiita(self, qclient): # use empty dict {} as an indication that get_tube_ids_from_qiita was # called but no tube-ids were found for any project. + # to clarify, self.tube_id_map maps sample-names to tube-ids. self.tube_id_map = tids_by_qiita_id # should samples_in_qiita be none if tube_id_map is not? self.samples_in_qiita = sample_names_by_qiita_id @@ -860,7 +855,10 @@ def _process_tube_ids(self, project_name, qiita_id, samples): # return None otherwise @classmethod - def _replace_with_tube_ids(cls, prep_file_path, tube_id_map): + def _replace_tube_ids_w_sample_names(cls, prep_file_path, tube_id_map): + # reversed_map maps tube-ids to sample-names + reversed_map = {tube_id_map[k]: k for k in tube_id_map} + # passing tube_id_map as a parameter allows for easier testing. df = pd.read_csv(prep_file_path, sep='\t', dtype=str, index_col=False) # save copy of sample_name column as 'old_sample_name' @@ -874,16 +872,13 @@ def _replace_with_tube_ids(cls, prep_file_path, tube_id_map): # remove leading zeroes if they exist to match Qiita results. sample_name = sample_name.lstrip('0') - reversed_map = {tube_id_map[k]: k for k in tube_id_map} if sample_name in reversed_map: df.at[i, "sample_name"] = reversed_map[sample_name] df.to_csv(prep_file_path, index=False, sep="\t") def _overwrite_prep_files(self, prep_file_paths): - # replaces sample-names in prep-files with tube-ids according to - # a dict with project-names as keys and another dict as a value. - # this dict uses sample-names as keys and tube-ids as values. + # replace tube-ids in prep-info files w/sample-names. if self.tube_id_map is None: raise ValueError("get_tube_ids_from_qiita() was not called") @@ -905,12 +900,10 @@ def _overwrite_prep_files(self, prep_file_paths): if len(matching_files) == 0: continue - if len(matching_files) > 1: - raise ValueError("More than one match found for project " - f"'{fqp_name}': {str(matching_files)}") - - Step._replace_with_tube_ids(matching_files[0], - self.tube_id_map[qiita_id]) + for matching_file in matching_files: + Step._replace_tube_ids_w_sample_names(matching_file, + self.tube_id_map[ + qiita_id]) def update_blanks_in_qiita(self, qclient): for sif_path in self.sifs: @@ -1010,6 +1003,39 @@ def execute_pipeline(self, qclient, increment_status, update=True, if "GenPrepFileJob" not in skip_steps: self.generate_prep_file() + # moved final component of genprepfilejob outside of object. + # obtain the paths to the prep-files generated by GenPrepFileJob + # w/out having to recover full state. + tmp = join(self.pipeline.output_path, 'GenPrepFileJob', 'PrepFiles') + + self.has_replicates = False + + prep_paths = [] + self.prep_file_paths = {} + + for root, dirs, files in walk(tmp): + for _file in files: + # breakup the prep-info-file into segments + # (run-id, project_qid, other) and cleave + # the qiita-id from the project_name. + qid = _file.split('.')[1].split('_')[-1] + + if qid not in self.prep_file_paths: + self.prep_file_paths[qid] = [] + + _path = abspath(join(root, _file)) + if _path.endswith('.tsv'): + prep_paths.append(_path) + self.prep_file_paths[qid].append(_path) + + for _dir in dirs: + if _dir == '1': + # if PrepFiles contains the '1' directory, then it's a + # given that this sample-sheet contains replicates. + self.has_replicates = True + + self._overwrite_prep_files(prep_paths) + # for now, simply re-run any line below as if it was a new job, even # for a restart. functionality is idempotent, except for the # registration of new preps in Qiita. These will simply be removed diff --git a/qp_klp/klp.py b/qp_klp/klp.py index 3cba6a3..12d7eb1 100644 --- a/qp_klp/klp.py +++ b/qp_klp/klp.py @@ -180,6 +180,8 @@ def sequence_processing_pipeline(qclient, job_id, parameters, out_dir): if exists(join(out_dir, 'GenPrepFileJob')): skip_steps.append('FastQCJob') + # it doesn't matter if cmds.log is a valid cmds.log or just + # an empty file. The cmds.log will get overwritten downstream. if exists(join(out_dir, 'cmds.log')): skip_steps.append('GenPrepFileJob') diff --git a/qp_klp/tests/data/configuration_profiles/iseq_metagenomic.json b/qp_klp/tests/data/configuration_profiles/iseq_metagenomic.json index cc76e80..cba9b72 100644 --- a/qp_klp/tests/data/configuration_profiles/iseq_metagenomic.json +++ b/qp_klp/tests/data/configuration_profiles/iseq_metagenomic.json @@ -92,4 +92,4 @@ } } } -} \ No newline at end of file +} diff --git a/qp_klp/tests/data/configuration_profiles/novaseq_amplicon.json b/qp_klp/tests/data/configuration_profiles/novaseq_amplicon.json index 40be83f..10aa695 100644 --- a/qp_klp/tests/data/configuration_profiles/novaseq_amplicon.json +++ b/qp_klp/tests/data/configuration_profiles/novaseq_amplicon.json @@ -60,4 +60,4 @@ } } } -} \ No newline at end of file +} diff --git a/qp_klp/tests/data/configuration_profiles/novaseq_metatranscriptomic.json b/qp_klp/tests/data/configuration_profiles/novaseq_metatranscriptomic.json index c56951f..449638a 100644 --- a/qp_klp/tests/data/configuration_profiles/novaseq_metatranscriptomic.json +++ b/qp_klp/tests/data/configuration_profiles/novaseq_metatranscriptomic.json @@ -92,4 +92,4 @@ } } } -} \ No newline at end of file +} diff --git a/qp_klp/tests/data/process_all_fastq_files.sh b/qp_klp/tests/data/process_all_fastq_files.sh index 57a99dc..3e01ff9 100644 --- a/qp_klp/tests/data/process_all_fastq_files.sh +++ b/qp_klp/tests/data/process_all_fastq_files.sh @@ -9,7 +9,9 @@ ### as well as sbatch -c. demux threads remains fixed at 1. ### Note -c set to 4 and thread counts set to 7 during testing. #SBATCH -c 2 -#SBATCH --gres=node_jobs:2 +### Commented out for now, but there is a possibility it will be needed +### in the future. +###SBATCH --gres=node_jobs:2 echo "---------------" @@ -53,8 +55,8 @@ export TMPDIR=${TMPDIR} export TMPDIR=$(mktemp -d) echo $TMPDIR -mkdir -p ${WKDIR}NuQCJob/fastp_reports_dir/html -mkdir -p ${WKDIR}NuQCJob/fastp_reports_dir/json +mkdir -p ${WKDIR}/fastp_reports_dir/html +mkdir -p ${WKDIR}/fastp_reports_dir/json export ADAPTER_ONLY_OUTPUT=${OUTPUT}/only-adapter-filtered mkdir -p ${ADAPTER_ONLY_OUTPUT} @@ -74,9 +76,8 @@ function mux-runner () { jobd=${TMPDIR} id_map=${jobd}/id_map - seqs_r1=${jobd}/seqs.r1.fastq.gz - seqs_r2=${jobd}/seqs.r2.fastq - r1_filt=${jobd}/seqs.r1.adapter-removed.fastq.gz + seqs_reads=${jobd}/seqs.interleaved.fastq + seq_reads_filter_alignment=${jobd}/seqs.interleaved.filter_alignment.fastq for i in $(seq 1 ${n}) do @@ -86,7 +87,7 @@ function mux-runner () { base=$(echo ${line} | cut -f 3 -d" ") r1_name=$(basename ${r1} .fastq.gz) r2_name=$(basename ${r2} .fastq.gz) - r1_adapter_only=${ADAPTER_ONLY_OUTPUT}/${r1_name}.fastq.gz + r_adapter_only=${ADAPTER_ONLY_OUTPUT}/${r1_name}.interleave.fastq.gz s_name=$(basename "${r1}" | sed -r 's/\.fastq\.gz//') html_name=$(echo "$s_name.html") @@ -94,6 +95,11 @@ function mux-runner () { echo -e "${i}\t${r1_name}\t${r2_name}\t${base}" >> ${id_map} + # movi, in the current version, works on the interleaved version of the + # fwd/rev reads so we are gonna take advantage fastp default output + # to minimize steps. Additionally, movi expects the input to not be + # gz, so we are not going to compress seqs_r1 + fastp \ -l 100 \ -i ${r1} \ @@ -102,47 +108,39 @@ function mux-runner () { --adapter_fasta fastp_known_adapters_formatted.fna \ --html REMOVED/qp-knight-lab-processing/qp_klp/tests/data/output_dir/NuQCJob/fastp_reports_dir/html/${html_name} \ --json REMOVED/qp-knight-lab-processing/qp_klp/tests/data/output_dir/NuQCJob/fastp_reports_dir/json/${json_name} \ - --stdout | gzip > ${r1_filt} + --stdout | gzip > ${r_adapter_only} # multiplex and write adapter filtered data all at once - zcat ${r1_filt} | \ + zcat ${r_adapter_only} | \ sed -r "1~4s/^@(.*)/@${i}${delimiter}\1/" \ - >> ${seqs_r1} - cat ${r1_filt} | \ - gzip -c > ${r1_adapter_only} & - wait - - rm ${r1_filt} & - wait + >> ${seqs_reads} done # minimap/samtools pair commands are now generated in NuQCJob._generate_mmi_filter_cmds() - # and passed to this template. This method assumes ${jobd} is the correct location to - # filter files, the initial file is "${jobd}/seqs.r1.fastq"), and the output name is - # "${jobd}/seqs.r1.ALIGN.fastq". - minimap2 -2 -ax sr -t 1 /databases/minimap2/db_1.mmi ${jobd}/seqs.r1.fastq -a | samtools fastq -@ 1 -f 12 -F 256 > ${jobd}/foo + # and passed to this template. + minimap2 -2 -ax sr -t 1 /databases/minimap2/db_1.mmi ${jobd}/seqs.interleaved.fastq -a | samtools fastq -@ 1 -f 12 -F 256 > ${jobd}/foo minimap2 -2 -ax sr -t 1 /databases/minimap2/db_2.mmi ${jobd}/foo -a | samtools fastq -@ 1 -f 12 -F 256 > ${jobd}/bar -mv ${jobd}/bar ${jobd}/seqs.r1.ALIGN.fastq +mv ${jobd}/bar ${jobd}/seqs.interleaved.filter_alignment.fastq [ -e ${jobd}/foo ] && rm ${jobd}/foo [ -e ${jobd}/bar ] && rm ${jobd}/bar /home/user/user_dir/Movi/build/movi-default query \ --index /scratch/movi_hg38_chm13_hprc94 \ - --read <(zcat ${jobd}/seqs.r1.ALIGN.fastq.gz) \ + --read ${seq_reads_filter_alignment} \ --stdout | gzip > ${jobd}/seqs.movi.txt.gz python /home/user/user_dir/human_host_filtration/scripts/qiita_filter_pmls.py <(zcat ${jobd}/seqs.movi.txt.gz) | \ - seqtk subseq ${jobd}/seqs.r1.ALIGN.fastq.gz - | gzip > ${jobd}/seqs.r1.final.fastq.gz + seqtk subseq ${seq_reads_filter_alignment} - > ${jobd}/seqs.final.fastq - REMOVED/sequence_processing_pipeline/scripts/splitter ${jobd}/seqs.r1.final.fastq \ + REMOVED/sequence_processing_pipeline/scripts/splitter ${jobd}/seqs.final.fastq \ ${jobd}/reads.r1.fastq ${delimiter} ${r1_tag} & - REMOVED/sequence_processing_pipeline/scripts/splitter ${jobd}/seqs.r1.final.fastq \ + REMOVED/sequence_processing_pipeline/scripts/splitter ${jobd}/seqs.final.fastq \ ${jobd}/reads.r2.fastq ${delimiter} ${r2_tag} & wait fastq_pair -t 50000000 ${jobd}/reads.r1.fastq ${jobd}/reads.r2.fastq # keep seqs.movi.txt and migrate it to NuQCJob directory. - mv ${jobd}/seqs.movi.txt.gz REMOVED/qp-knight-lab-processing/qp_klp/tests/data/output_dir/NuQCJob/seqs.movi.${SLURM_ARRAY_TASK_ID}.txt.gz + mv ${jobd}/seqs.movi.txt.gz REMOVED/qp-knight-lab-processing/qp_klp/tests/data/output_dir/NuQCJob/logs/seqs.movi.${SLURM_ARRAY_TASK_ID}.txt.gz } export -f mux-runner