Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates to bring plugin in-line w/mg-scripts #93

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/qiita-plugin-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ jobs:
# seqtk is a conda-installed requirement for metapool and isn't
# installed automatically by its setup.py.
conda config --add channels bioconda
conda create -q --yes -n klp python=3.9 seqtk
conda create -q --yes -n klp python=3.9 'seqtk>=1.4'
conda activate klp

export QIITA_SERVER_CERT=`pwd`/qiita-dev/qiita_core/support_files/server.crt
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ dependencies = [
"pandas",
"qiita-files@https://github.com/qiita-spots/qiita-files/archive/master.zip",
"qiita_client@https://github.com/qiita-spots/qiita_client/archive/master.zip",
"sequence-processing-pipeline@https://github.com/biocore/mg-scripts/archive/master.zip",
"sequence-processing-pipeline@https://github.com/biocore/mg-scripts/archive/master.zip"
]
[project.scripts]
configure_klp = "qp_klp.scripts.configure_klp:config"
Expand Down
9 changes: 5 additions & 4 deletions qp_klp/Step.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,12 +327,13 @@ def _quality_control(self, config, input_file_path):
self.master_qiita_job_id,
config['job_max_array_length'],
config['known_adapters_path'],
config['movi_executable_path'],
config['gres_value'],
config['pmls_path'],
config['additional_fastq_tags'],
bucket_size=config['bucket_size'],
length_limit=config['length_limit'],
cores_per_task=config['cores_per_task'],
movi_path=config['movi_executable_path'],
gres_value=config['gres_value'],
pmls_path=config['pmls_path'])
cores_per_task=config['cores_per_task'])

nuqc_job.run(callback=self.update_callback)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
"cores_per_task": 4,
"movi_executable_path": "/home/user/user_dir/Movi/build/movi-default",
"gres_value": 4,
"additional_fastq_tags": [],
"pmls_path": "/home/user/user_dir/human_host_filtration/scripts/qiita_filter_pmls.py"
},
"seqpro": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"cores_per_task": 2,
"movi_executable_path": "/home/user/user_dir/Movi/build/movi-default",
"gres_value": 4,
"additional_fastq_tags": [],
"pmls_path": "/home/user/user_dir/human_host_filtration/scripts/qiita_filter_pmls.py"
},
"seqpro": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
"cores_per_task": 2,
"movi_executable_path": "/home/user/user_dir/Movi/build/movi-default",
"gres_value": 2,
"additional_fastq_tags": [],
"pmls_path": "/home/user/user_dir/human_host_filtration/scripts/qiita_filter_pmls.py"
},
"seqpro": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
"cores_per_task": 2,
"movi_executable_path": "/home/user/user_dir/Movi/build/movi-default",
"gres_value": 4,
"additional_fastq_tags": [],
"pmls_path": "/home/user/user_dir/human_host_filtration/scripts/qiita_filter_pmls.py"
},
"seqpro": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"cores_per_task": 4,
"movi_executable_path": "/home/user/user_dir/Movi/build/movi-default",
"gres_value": 4,
"additional_fastq_tags": [],
"pmls_path": "/home/user/user_dir/human_host_filtration/scripts/qiita_filter_pmls.py"
},
"seqpro": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
"cores_per_task": 4,
"movi_executable_path": "/home/user/user_dir/Movi/build/movi-default",
"gres_value": 4,
"additional_fastq_tags": [],
"pmls_path": "/home/user/user_dir/human_host_filtration/scripts/qiita_filter_pmls.py"
},
"seqpro": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
"cores_per_task": 4,
"movi_executable_path": "/home/user/user_dir/Movi/build/movi-default",
"gres_value": 4,
"additional_fastq_tags": [],
"pmls_path": "/home/user/user_dir/human_host_filtration/scripts/qiita_filter_pmls.py"
},
"seqpro": {
Expand Down
182 changes: 182 additions & 0 deletions qp_klp/tests/data/process_all_fastq_files_w_descr.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
#!/bin/bash -l
#SBATCH -J 077c4da8-74eb-4184-8860-0207f53623be_NuQCJob
#SBATCH -p qiita
### wall-time-limit in minutes
#SBATCH --time 2028
#SBATCH --mem 20G
#SBATCH -N 2
### Note cores_per_task maps to fastp & minimap2 thread counts
### as well as sbatch -c. demux threads remains fixed at 1.
### Note -c set to 4 and thread counts set to 7 during testing.
#SBATCH -c 2
### Commented out for now, but there is a possibility it will be needed
### in the future.
###SBATCH --gres=node_jobs:2


echo "---------------"
echo "Run details:"
echo "$SLURM_JOB_NAME $SLURM_JOB_ID $SLURMD_NODENAME $SLURM_ARRAY_TASK_ID"
echo "---------------"

if [[ -z "${SLURM_ARRAY_TASK_ID}" ]]; then
echo "Not operating within an array"
exit 1
fi
if [[ -z ${PREFIX} ]]; then
echo "PREFIX is not set"
exit 1
fi
if [[ -z ${OUTPUT} ]]; then
echo "OUTPUT is not set"
exit 1
fi

conda activate qp-knight-lab-processing-2022.03
module load fastp_0.20.1 samtools_1.12 minimap2_2.18

set -x
set -e
set -o pipefail

export FILES=$(printf "%s-%d" ${PREFIX} ${SLURM_ARRAY_TASK_ID})
if [[ ! -f ${FILES} ]]; then
logger ${FILES} not found
exit 1
fi
# set a temp directory, make a new unique one under it and
# make sure we clean up as we're dumping to shm
# DO NOT do this casually. Only do a clean up like this if
# you know for sure TMPDIR is what you want.

WKDIR=${OUTPUT}/
TMPDIR=${OUTPUT}
export TMPDIR=${TMPDIR}
export TMPDIR=$(mktemp -d)
echo $TMPDIR

mkdir -p ${WKDIR}/fastp_reports_dir/html
mkdir -p ${WKDIR}/fastp_reports_dir/json

export ADAPTER_ONLY_OUTPUT=${OUTPUT}/only-adapter-filtered
mkdir -p ${ADAPTER_ONLY_OUTPUT}

function cleanup {
echo "Removing $TMPDIR"
rm -fr $TMPDIR
unset TMPDIR
}
trap cleanup EXIT

export delimiter=::MUX::
export r1_tag=/1
export r2_tag=/2
function mux-runner () {
n=$(wc -l ${FILES} | cut -f 1 -d" ")

jobd=${TMPDIR}
id_map=${jobd}/id_map
seqs_reads=${jobd}/seqs.interleaved.fastq
seq_reads_filter_alignment=${jobd}/seqs.interleaved.filter_alignment.fastq

for i in $(seq 1 ${n})
do
line=$(head -n ${i} ${FILES} | tail -n 1)
r1=$(echo ${line} | cut -f 1 -d" ")
r2=$(echo ${line} | cut -f 2 -d" ")
base=$(echo ${line} | cut -f 3 -d" ")
r1_name=$(basename ${r1} .fastq.gz)
r2_name=$(basename ${r2} .fastq.gz)
r_adapter_only=${ADAPTER_ONLY_OUTPUT}/${r1_name}.interleave.fastq.gz

s_name=$(basename "${r1}" | sed -r 's/\.fastq\.gz//')
html_name=$(echo "$s_name.html")
json_name=$(echo "$s_name.json")

echo -e "${i}\t${r1_name}\t${r2_name}\t${base}" >> ${id_map}

# movi, in the current version, works on the interleaved version of the
# fwd/rev reads so we are gonna take advantage fastp default output
# to minimize steps. Additionally, movi expects the input to not be
# gz, so we are not going to compress seqs_r1

fastp \
-l 100 \
-i ${r1} \
-I ${r2} \
-w 2 \
--adapter_fasta fastp_known_adapters_formatted.fna \
--html REMOVED/qp-knight-lab-processing/qp_klp/tests/data/output_dir/NuQCJob/fastp_reports_dir/html/${html_name} \
--json REMOVED/qp-knight-lab-processing/qp_klp/tests/data/output_dir/NuQCJob/fastp_reports_dir/json/${json_name} \
--stdout | gzip > ${r_adapter_only}

# multiplex and write adapter filtered data all at once
zcat ${r_adapter_only} | \
sed -r "1~4s/^@(.*)/@${i}${delimiter}\1/" \
>> ${seqs_reads}
done

# minimap/samtools pair commands are now generated in NuQCJob._generate_mmi_filter_cmds()
# and passed to this template.
minimap2 -2 -ax sr -y -t 1 /databases/minimap2/db_1.mmi ${jobd}/seqs.interleaved.fastq -a | samtools fastq -@ 1 -f 12 -F 256 -T BX > ${jobd}/foo
minimap2 -2 -ax sr -y -t 1 /databases/minimap2/db_2.mmi ${jobd}/foo -a | samtools fastq -@ 1 -f 12 -F 256 -T BX > ${jobd}/bar
mv ${jobd}/bar ${jobd}/seqs.interleaved.filter_alignment.fastq
[ -e ${jobd}/foo ] && rm ${jobd}/foo
[ -e ${jobd}/bar ] && rm ${jobd}/bar

/home/user/user_dir/Movi/build/movi-default query \
--index /scratch/movi_hg38_chm13_hprc94 \
--read ${seq_reads_filter_alignment} \
--stdout | gzip > ${jobd}/seqs.movi.txt.gz

python /home/user/user_dir/human_host_filtration/scripts/qiita_filter_pmls.py <(zcat ${jobd}/seqs.movi.txt.gz) | \
seqtk subseq ${seq_reads_filter_alignment} - > ${jobd}/seqs.final.fastq

REMOVED/sequence_processing_pipeline/scripts/splitter ${jobd}/seqs.final.fastq \
${jobd}/reads.r1.fastq ${delimiter} ${r1_tag} &
REMOVED/sequence_processing_pipeline/scripts/splitter ${jobd}/seqs.final.fastq \
${jobd}/reads.r2.fastq ${delimiter} ${r2_tag} &
wait
fastq_pair -t 50000000 ${jobd}/reads.r1.fastq ${jobd}/reads.r2.fastq

# keep seqs.movi.txt and migrate it to NuQCJob directory.
mv ${jobd}/seqs.movi.txt.gz REMOVED/qp-knight-lab-processing/qp_klp/tests/data/output_dir/NuQCJob/logs/seqs.movi.${SLURM_ARRAY_TASK_ID}.txt.gz
}
export -f mux-runner


function demux-runner () {
n_demux_jobs=${SLURM_CPUS_PER_TASK}
jobd=${TMPDIR}
id_map=${jobd}/id_map
seqs_r1=${jobd}/reads.r1.fastq.paired.fq
seqs_r2=${jobd}/reads.r2.fastq.paired.fq

id_map=${jobd}/id_map
if [[ ! -f ${id_map} ]]; then
echo "No samples..."
return
fi

for idx in $(seq 0 ${n_demux_jobs})
do
python REMOVED/demux \
--id-map ${id_map} \
--infile <(cat ${seqs_r1} ${seqs_r2}) \
--output ${OUTPUT} \
--task ${idx} \
--maxtask ${n_demux_jobs} &
done
wait
}
export -f demux-runner

mux-runner

mkdir -p ${OUTPUT}

echo "$(date) :: demux start"
demux-runner
echo "$(date) :: demux stop"

touch ${OUTPUT}/${SLURM_JOB_NAME}.${SLURM_ARRAY_TASK_ID}.completed
40 changes: 33 additions & 7 deletions qp_klp/tests/test_metagenomic_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,31 @@ def _generate_fake_fastq_files(self, output_path, file_count,
return return_these

def test_metagenomic_quality_control(self):
# depending on the addition of optional fastq descriptions in
# the configuration of NuQCJob w/in Step.quality_control(), the
# resulting job script will be slightly different. Since this
# parameter is passed to NuQCJob() directly from the configuration-
# profile, the test code remains the same in both cases. To test
# both paths we must vary configuration file and the expected output
# file.

# test w/out optional fastq descriptions specified.
exp_output_path = join(self.process_shell_script,
"process_all_fastq_files.sh")
self.metagenomic_quality_control_exam(exp_output_path)

def test_metagenomic_quality_control_w_descr(self):
# test w/optional fastq descriptions.

# rather than direct Step to use a different configuration file just
# to change one value, we will modify the value in place.
ref = self.pipeline.config_profile['profile']['configuration']
ref['nu-qc']['additional_fastq_tags'] = ['BX']
exp_output_path = join(self.process_shell_script,
"process_all_fastq_files_w_descr.sh")
self.metagenomic_quality_control_exam(exp_output_path)

def metagenomic_quality_control_exam(self, exp_output_path):
self._create_test_input(2)

metadata = {'NYU_BMS_Melanoma_13059': {'needs_filtering': False,
Expand Down Expand Up @@ -142,28 +167,29 @@ def test_metagenomic_quality_control(self):
'.trimmed.fastq.gz'))
copyfile(fp, file_path)

# Since the 'sbatch' and 'sacct' commands don't exist in the testing
# Since the 'sbatch' and 'squeue' commands don't exist in the testing
# environment, fake them by creating fakes that will output to stdout
# the metadata needed to keep run() working as intended. These faked
# binary files overwrite the versions created by test_step.py and are
# automatically deleted after testing. test_step.py sets chmod for us.
with open(join(dirname(sys.executable), 'sbatch'), 'w') as f:
# code will extract 777 for use as the fake job-id in slurm.
f.write("echo Hello 777")
# code will extract 9999999 for use as the fake job-id in slurm.
f.write("echo 'Submitted batch job 9999999'")

with open(join(dirname(sys.executable), 'sacct'), 'w') as f:
# fake sacct will return job-id 777 completed successfully.
with open(join(dirname(sys.executable), 'squeue'), 'w') as f:
# fake squeue will return job-id 9999999 completed successfully.
# faked output files created in test method() will generate
# faked results.
f.write("echo \"777|cc_fake_job.sh|COMPLETED|00:10:00|0:0\"")
f.write("echo \"ARRAY_JOB_ID,JOBID,STATE\n9999999,9999999,"
"COMPLETED\"")

# execute the quality_control() method, which will in turn call NuQC's
# run() method.
step.quality_control()

# after step.quality_control() executes, process_all_fastq_files.sh
# should be created. Confirm the output of this file is as expected.
with open(self.process_shell_script, 'r') as f:
with open(exp_output_path, 'r') as f:
exp = f.readlines()
exp = [line.rstrip() for line in exp]

Expand Down
8 changes: 3 additions & 5 deletions qp_klp/tests/test_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,13 +238,13 @@ def setUp(self):
self.good_sample_sheet_path = cc_path('good-sample-sheet.csv')
self.another_good_sample_sheet_path = cc_path('another-good-sample-'
'sheet.csv')
self.process_shell_script = cc_path()
self.sheet_w_replicates_path = cc_path('good_sheet_w_replicates.csv')
self.good_mapping_file_path = cc_path('good-mapping-file.txt')
self.good_prep_info_file_path = cc_path('good-sample-prep.tsv')
self.good_transcript_sheet_path = cc_path('good-sample-sheet-'
'transcriptomics.csv')
self.output_file_path = cc_path('output_dir')
self.process_shell_script = cc_path('process_all_fastq_files.sh')
self.master_config_path = cc_path('configuration.json')
self.dummy_fastq_file = cc_path('dummy.fastq.gz')
self.mini_sheet_path = cc_path('mini-sample-sheet.csv')
Expand Down Expand Up @@ -336,10 +336,8 @@ def _create_test_input(self, stage):
self._create_fake_bin('sbatch', "echo 'Submitted "
"batch job 9999999'")

self._create_fake_bin('sacct', "echo '9999999|99999999-9999-9999"
"-9999-999999999999.txt|COMPLETED"
"|09:53:41|0:0'")

self._create_fake_bin('squeue', "echo 'ARRAY_JOB_ID,JOBID,STATE\n"
"9999999,9999999,COMPLETED'")
if stage >= 2:
# generate dummy fastq files in ConvertJob and create an empty
# NuQCJob directory to use for testing NuQCJob initialization.
Expand Down
Loading