Skip to content

Commit

Permalink
Updates based on initial runs in production. (#89)
Browse files Browse the repository at this point in the history
* Updates based on initial runs in production.

* Updates from production. tests updated.
  • Loading branch information
charles-cowart authored Jun 26, 2024
1 parent 4f2f4d5 commit 99e627b
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 47 deletions.
62 changes: 44 additions & 18 deletions qp_klp/Step.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from itertools import chain
from collections import defaultdict
from json import dumps, load
from metapool import load_sample_sheet
from os import makedirs, walk, listdir
from os.path import join, exists, split, basename, dirname
from os.path import join, exists, split, basename, dirname, abspath
from sequence_processing_pipeline.ConvertJob import ConvertJob
from sequence_processing_pipeline.FastQCJob import FastQCJob
from sequence_processing_pipeline.GenPrepFileJob import GenPrepFileJob
Expand Down Expand Up @@ -380,11 +379,6 @@ def _generate_prep_file(self, config, input_file_path, seqpro_path):

gpf_job.run(callback=self.update_callback)

# concatenate the lists of paths across all study_ids into a single
# list. Replace sample-names w/tube-ids in all relevant prep-files.
preps = list(chain.from_iterable(gpf_job.prep_file_paths.values()))
self._overwrite_prep_files(preps)

return gpf_job

def _helper_process_fastp_report_dirs(self):
Expand Down Expand Up @@ -727,6 +721,7 @@ def _get_tube_ids_from_qiita(self, qclient):

# use empty dict {} as an indication that get_tube_ids_from_qiita was
# called but no tube-ids were found for any project.
# to clarify, self.tube_id_map maps sample-names to tube-ids.
self.tube_id_map = tids_by_qiita_id
# should samples_in_qiita be none if tube_id_map is not?
self.samples_in_qiita = sample_names_by_qiita_id
Expand Down Expand Up @@ -860,7 +855,10 @@ def _process_tube_ids(self, project_name, qiita_id, samples):
# return None otherwise

@classmethod
def _replace_with_tube_ids(cls, prep_file_path, tube_id_map):
def _replace_tube_ids_w_sample_names(cls, prep_file_path, tube_id_map):
# reversed_map maps tube-ids to sample-names
reversed_map = {tube_id_map[k]: k for k in tube_id_map}

# passing tube_id_map as a parameter allows for easier testing.
df = pd.read_csv(prep_file_path, sep='\t', dtype=str, index_col=False)
# save copy of sample_name column as 'old_sample_name'
Expand All @@ -874,16 +872,13 @@ def _replace_with_tube_ids(cls, prep_file_path, tube_id_map):
# remove leading zeroes if they exist to match Qiita results.
sample_name = sample_name.lstrip('0')

reversed_map = {tube_id_map[k]: k for k in tube_id_map}
if sample_name in reversed_map:
df.at[i, "sample_name"] = reversed_map[sample_name]

df.to_csv(prep_file_path, index=False, sep="\t")

def _overwrite_prep_files(self, prep_file_paths):
# replaces sample-names in prep-files with tube-ids according to
# a dict with project-names as keys and another dict as a value.
# this dict uses sample-names as keys and tube-ids as values.
# replace tube-ids in prep-info files w/sample-names.
if self.tube_id_map is None:
raise ValueError("get_tube_ids_from_qiita() was not called")

Expand All @@ -905,12 +900,10 @@ def _overwrite_prep_files(self, prep_file_paths):
if len(matching_files) == 0:
continue

if len(matching_files) > 1:
raise ValueError("More than one match found for project "
f"'{fqp_name}': {str(matching_files)}")

Step._replace_with_tube_ids(matching_files[0],
self.tube_id_map[qiita_id])
for matching_file in matching_files:
Step._replace_tube_ids_w_sample_names(matching_file,
self.tube_id_map[
qiita_id])

def update_blanks_in_qiita(self, qclient):
for sif_path in self.sifs:
Expand Down Expand Up @@ -1010,6 +1003,39 @@ def execute_pipeline(self, qclient, increment_status, update=True,
if "GenPrepFileJob" not in skip_steps:
self.generate_prep_file()

# moved final component of genprepfilejob outside of object.
# obtain the paths to the prep-files generated by GenPrepFileJob
# w/out having to recover full state.
tmp = join(self.pipeline.output_path, 'GenPrepFileJob', 'PrepFiles')

self.has_replicates = False

prep_paths = []
self.prep_file_paths = {}

for root, dirs, files in walk(tmp):
for _file in files:
# breakup the prep-info-file into segments
# (run-id, project_qid, other) and cleave
# the qiita-id from the project_name.
qid = _file.split('.')[1].split('_')[-1]

if qid not in self.prep_file_paths:
self.prep_file_paths[qid] = []

_path = abspath(join(root, _file))
if _path.endswith('.tsv'):
prep_paths.append(_path)
self.prep_file_paths[qid].append(_path)

for _dir in dirs:
if _dir == '1':
# if PrepFiles contains the '1' directory, then it's a
# given that this sample-sheet contains replicates.
self.has_replicates = True

self._overwrite_prep_files(prep_paths)

# for now, simply re-run any line below as if it was a new job, even
# for a restart. functionality is idempotent, except for the
# registration of new preps in Qiita. These will simply be removed
Expand Down
2 changes: 2 additions & 0 deletions qp_klp/klp.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ def sequence_processing_pipeline(qclient, job_id, parameters, out_dir):
if exists(join(out_dir, 'GenPrepFileJob')):
skip_steps.append('FastQCJob')

# it doesn't matter if cmds.log is a valid cmds.log or just
# an empty file. The cmds.log will get overwritten downstream.
if exists(join(out_dir, 'cmds.log')):
skip_steps.append('GenPrepFileJob')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,4 @@
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,4 @@
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,4 @@
}
}
}
}
}
50 changes: 24 additions & 26 deletions qp_klp/tests/data/process_all_fastq_files.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
### as well as sbatch -c. demux threads remains fixed at 1.
### Note -c set to 4 and thread counts set to 7 during testing.
#SBATCH -c 2
#SBATCH --gres=node_jobs:2
### Commented out for now, but there is a possibility it will be needed
### in the future.
###SBATCH --gres=node_jobs:2


echo "---------------"
Expand Down Expand Up @@ -53,8 +55,8 @@ export TMPDIR=${TMPDIR}
export TMPDIR=$(mktemp -d)
echo $TMPDIR

mkdir -p ${WKDIR}NuQCJob/fastp_reports_dir/html
mkdir -p ${WKDIR}NuQCJob/fastp_reports_dir/json
mkdir -p ${WKDIR}/fastp_reports_dir/html
mkdir -p ${WKDIR}/fastp_reports_dir/json

export ADAPTER_ONLY_OUTPUT=${OUTPUT}/only-adapter-filtered
mkdir -p ${ADAPTER_ONLY_OUTPUT}
Expand All @@ -74,9 +76,8 @@ function mux-runner () {

jobd=${TMPDIR}
id_map=${jobd}/id_map
seqs_r1=${jobd}/seqs.r1.fastq.gz
seqs_r2=${jobd}/seqs.r2.fastq
r1_filt=${jobd}/seqs.r1.adapter-removed.fastq.gz
seqs_reads=${jobd}/seqs.interleaved.fastq
seq_reads_filter_alignment=${jobd}/seqs.interleaved.filter_alignment.fastq

for i in $(seq 1 ${n})
do
Expand All @@ -86,14 +87,19 @@ function mux-runner () {
base=$(echo ${line} | cut -f 3 -d" ")
r1_name=$(basename ${r1} .fastq.gz)
r2_name=$(basename ${r2} .fastq.gz)
r1_adapter_only=${ADAPTER_ONLY_OUTPUT}/${r1_name}.fastq.gz
r_adapter_only=${ADAPTER_ONLY_OUTPUT}/${r1_name}.interleave.fastq.gz

s_name=$(basename "${r1}" | sed -r 's/\.fastq\.gz//')
html_name=$(echo "$s_name.html")
json_name=$(echo "$s_name.json")

echo -e "${i}\t${r1_name}\t${r2_name}\t${base}" >> ${id_map}

# movi, in the current version, works on the interleaved version of the
# fwd/rev reads so we are gonna take advantage fastp default output
# to minimize steps. Additionally, movi expects the input to not be
# gz, so we are not going to compress seqs_r1

fastp \
-l 100 \
-i ${r1} \
Expand All @@ -102,47 +108,39 @@ function mux-runner () {
--adapter_fasta fastp_known_adapters_formatted.fna \
--html REMOVED/qp-knight-lab-processing/qp_klp/tests/data/output_dir/NuQCJob/fastp_reports_dir/html/${html_name} \
--json REMOVED/qp-knight-lab-processing/qp_klp/tests/data/output_dir/NuQCJob/fastp_reports_dir/json/${json_name} \
--stdout | gzip > ${r1_filt}
--stdout | gzip > ${r_adapter_only}

# multiplex and write adapter filtered data all at once
zcat ${r1_filt} | \
zcat ${r_adapter_only} | \
sed -r "1~4s/^@(.*)/@${i}${delimiter}\1/" \
>> ${seqs_r1}
cat ${r1_filt} | \
gzip -c > ${r1_adapter_only} &
wait

rm ${r1_filt} &
wait
>> ${seqs_reads}
done

# minimap/samtools pair commands are now generated in NuQCJob._generate_mmi_filter_cmds()
# and passed to this template. This method assumes ${jobd} is the correct location to
# filter files, the initial file is "${jobd}/seqs.r1.fastq"), and the output name is
# "${jobd}/seqs.r1.ALIGN.fastq".
minimap2 -2 -ax sr -t 1 /databases/minimap2/db_1.mmi ${jobd}/seqs.r1.fastq -a | samtools fastq -@ 1 -f 12 -F 256 > ${jobd}/foo
# and passed to this template.
minimap2 -2 -ax sr -t 1 /databases/minimap2/db_1.mmi ${jobd}/seqs.interleaved.fastq -a | samtools fastq -@ 1 -f 12 -F 256 > ${jobd}/foo
minimap2 -2 -ax sr -t 1 /databases/minimap2/db_2.mmi ${jobd}/foo -a | samtools fastq -@ 1 -f 12 -F 256 > ${jobd}/bar
mv ${jobd}/bar ${jobd}/seqs.r1.ALIGN.fastq
mv ${jobd}/bar ${jobd}/seqs.interleaved.filter_alignment.fastq
[ -e ${jobd}/foo ] && rm ${jobd}/foo
[ -e ${jobd}/bar ] && rm ${jobd}/bar

/home/user/user_dir/Movi/build/movi-default query \
--index /scratch/movi_hg38_chm13_hprc94 \
--read <(zcat ${jobd}/seqs.r1.ALIGN.fastq.gz) \
--read ${seq_reads_filter_alignment} \
--stdout | gzip > ${jobd}/seqs.movi.txt.gz

python /home/user/user_dir/human_host_filtration/scripts/qiita_filter_pmls.py <(zcat ${jobd}/seqs.movi.txt.gz) | \
seqtk subseq ${jobd}/seqs.r1.ALIGN.fastq.gz - | gzip > ${jobd}/seqs.r1.final.fastq.gz
seqtk subseq ${seq_reads_filter_alignment} - > ${jobd}/seqs.final.fastq

REMOVED/sequence_processing_pipeline/scripts/splitter ${jobd}/seqs.r1.final.fastq \
REMOVED/sequence_processing_pipeline/scripts/splitter ${jobd}/seqs.final.fastq \
${jobd}/reads.r1.fastq ${delimiter} ${r1_tag} &
REMOVED/sequence_processing_pipeline/scripts/splitter ${jobd}/seqs.r1.final.fastq \
REMOVED/sequence_processing_pipeline/scripts/splitter ${jobd}/seqs.final.fastq \
${jobd}/reads.r2.fastq ${delimiter} ${r2_tag} &
wait
fastq_pair -t 50000000 ${jobd}/reads.r1.fastq ${jobd}/reads.r2.fastq

# keep seqs.movi.txt and migrate it to NuQCJob directory.
mv ${jobd}/seqs.movi.txt.gz REMOVED/qp-knight-lab-processing/qp_klp/tests/data/output_dir/NuQCJob/seqs.movi.${SLURM_ARRAY_TASK_ID}.txt.gz
mv ${jobd}/seqs.movi.txt.gz REMOVED/qp-knight-lab-processing/qp_klp/tests/data/output_dir/NuQCJob/logs/seqs.movi.${SLURM_ARRAY_TASK_ID}.txt.gz
}
export -f mux-runner

Expand Down

0 comments on commit 99e627b

Please sign in to comment.