Skip to content

Commit

Permalink
Added variant callers for benchmarking (resolves BD2KGenomics#97)
Browse files Browse the repository at this point in the history
Added:

* FreeBayes
* Platypus
* 16GT
* Strelka
* Samtools Mpileup/BCFtools call
  • Loading branch information
fnothaft committed Aug 14, 2017
1 parent 12e2fa9 commit 2929f96
Showing 1 changed file with 291 additions and 0 deletions.
291 changes: 291 additions & 0 deletions src/toil_lib/tools/variant_callers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
import logging
import os
import time

from toil.lib.docker import dockerCall

from toil_lib.tools import log_runtime

_log = logging.getLogger(__name__)

def run_freebayes(job, ref, ref_fai, bam, bai, chunksize=None):
'''
Calls FreeBayes to call variants.
If job.cores > 1 and chunksize is not None, then runs FreeBayes
parallel mode. If not, then runs single threaded FreeBayes.
:param JobFunctionWrappingJob job: passed automatically by Toil.
:param str ref: The reference FASTA FileStoreID.
:param str ref_fai: The reference FASTA index FileStoreID.
:param str bam: The FileStoreID of the BAM to call.
:param str bai: The FileStoreID of the BAM index.
:param int chunksize: The size of chunks to split into and call in parallel.
Defaults to None.
'''
work_dir = job.fileStore.getLocalTempDir()
file_ids = [ref, ref_fai, bam, bai]
file_names = ['ref.fa', 'ref.fa.fai', 'sample.bam', 'sample.bam.bai']
for file_store_id, name in zip(file_ids, file_names):
job.fileStore.readGlobalFile(file_store_id, os.path.join(work_dir, name))

output_vcf = os.path.join(work_dir, 'sample.vcf')

if job.cores > 1 and chunksize:
_log.info('Running FreeBayes parallel with %dbp chunks and %d cores',
chunksize,
job.cores)

docker_parameters = ['--rm',
'--log-driver', 'none',
'-v', '{}:/data'.format(work_dir),
'--entrypoint=/opt/cgl-docker-lib/freebayes/scripts/fasta_generate_regions.py']
start_time = time.time()
dockerCall(job,
workDir=work_dir,
dockerParameters=docker_parameters,
parameters=['/data/ref.fa.fai', str(chunksize)],
tool='quay.io/ucsc_cgl/freebayes',
outfile=open(os.path.join(work_dir, 'regions')))
end_time = time.time()
log_runtime(job, start_time, end_time, 'fasta_generate_regions')

docker_parameters = ['--rm',
'--log-driver', 'none',
'-v', '{}:/data'.format(work_dir),
'--entrypoint=/opt/cgl-docker-lib/freebayes/scripts/freebayes-parallel']
start_time = time.time()
dockerCall(job=job,
workDir=work_dir,
dockerParameters=docker_parameters,
parameters=['/data/regions',
str(job.cores),
'-f', '/data/ref.fa',
'/data/sample.bam'],
tool='quay.io/ucsc_cgl/freebayes',
outfile=open(output_vcf))
end_time = time.time()
log_runtime(job, start_time, end_time, 'FreeBayes Parallel')

else:
_log.info('Running FreeBayes single threaded.')

start_time = time.time()
dockerCall(job=job,
workDir=work_dir,
parameters=['-f', '/data/ref.fa',
'/data/sample.bam'],
tool='quay.io/ucsc_cgl/freebayes',
outfile=open(output_vcf))
end_time = time.time()
log_runtime(job, start_time, end_time, 'FreeBayes')

return job.fileStore.writeGlobalFile(output_vcf)


def run_platypus(job, ref, ref_fai, bam, bai, assemble=False):
'''
Runs Platypus to call variants.
:param JobFunctionWrappingJob job: passed automatically by Toil.
:param str ref: The reference FASTA FileStoreID.
:param str ref_fai: The reference FASTA index FileStoreID.
:param str bam: The FileStoreID of the BAM to call.
:param str bai: The FileStoreID of the BAM index.
:param boolean assemble: If true, runs Platypus in assembler mode.
'''
work_dir = job.fileStore.getLocalTempDir()
file_ids = [ref, ref_fai, bam, bai]
file_names = ['ref.fa', 'ref.fa.fai', 'sample.bam', 'sample.bam.bai']
for file_store_id, name in zip(file_ids, file_names):
job.fileStore.readGlobalFile(file_store_id, os.path.join(work_dir, name))

parameters = ['callVariants',
'--refFile', '/data/ref.fa',
'--outputFile', '/data/sample.vcf',
'--bamFiles', '/data/sample.bam']

if job.cores > 1:
parameters.extend(['--nCPU', str(job.cores)])

if assemble:
parameters.append('--assemble')

start_time = time.time()
dockerCall(job=job,
workDir=work_dir,
parameters=parameters,
tool='quay.io/ucsc_cgl/platypus')
end_time = time.time()
log_runtime(job, start_time, end_time, 'Platypus, assemble={}'.format(assemble))

return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.vcf'))


def run_16gt(job, ref, genome_index, bam, dbsnp, sample_name):
'''
Generates the snapshot file and calls variants using 16GT.
:param JobFunctionWrappingJob job: passed automatically by Toil.
:param str ref: The reference FASTA FileStoreID.
:param str genome_index: The FileStoreID of the SOAP3-dp genome index.
:param str bam: The FileStoreID of the BAM to call.
:param str dbsnp: The FileStoreID of the dbSNP VCF for filtration.
:param str sample_name: The name of the sample being called.
'''
work_dir = job.fileStore.getLocalTempDir()
file_ids = [ref, genome_index, bam, dbsnp]
file_names = ['ref.fa', 'ref.fa.index', 'sample.bam', 'dbsnp.vcf']
for file_store_id, name in zip(file_ids, file_names):
job.fileStore.readGlobalFile(file_store_id, os.path.join(work_dir, name))

start_time = time.time()
dockerCall(job=job,
workDir=work_dir,
parameters=['/opt/cgl-docker-lib/16GT/bam2snapshot',
'-i', '/data/ref.fa.index',
'-b', '/data/sample.bam',
'-o', '/data/sample'],
tool='quay.io/ucsc_cgl/16gt')
end_time = time.time()
log_runtime(job, start_time, end_time, '16gt bam2snapshot')

start_time = time.time()
dockerCall(job=job,
workDir=work_dir,
parameters=['/opt/cgl-docker-lib/16GT/snapshotSnpcaller',
'-i', '/data/ref.fa.index',
'-o', '/data/sample'],
tool='quay.io/ucsc_cgl/16gt')
end_time = time.time()
log_runtime(job, start_time, end_time, '16gt snapshotSnpcaller')

start_time = time.time()
dockerCall(job=job,
workDir=work_dir,
parameters=['perl', '/opt/cgl-docker-lib/16GT/txt2vcf.pl',
'/data/sample',
sample_name,
'/data/ref.fa'],
tool='quay.io/ucsc_cgl/16gt',
outfile=open(os.path.join(work_dir, 'sample.vcf')))
end_time = time.time()
log_runtime(job, start_time, end_time, '16gt txt2vcf')

start_time = time.time()
dockerCall(job=job,
workDir=work_dir,
parameters=['perl', '/opt/cgl-docker-lib/16GT/filterVCF.pl',
'/data/sample.vcf',
'/data/dbsnp.vcf'],
tool='quay.io/ucsc_cgl/16gt',
outfile=open(os.path.join(work_dir, 'sample.filtered.vcf')))
end_time = time.time()
log_runtime(job, start_time, end_time, '16gt filterVCF')

return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.filtered.vcf'))


def run_strelka(job, ref, bam, bai, candidate_indels=None):
'''
Runs Strelka's germline single sample caller.
:param JobFunctionWrappingJob job: passed automatically by Toil.
:param str ref: The reference FASTA FileStoreID.
:param str bam: The FileStoreID of the BAM to call.
:param str bai: The FileStoreID of the BAM index.
:param str candidate_indels: The optional FileStoreID of the candidate
indel GZIPed VCF.
'''
generate_parameters = ['/opt/strelka/bin/configureStrelkaGermlineWorkflow.py',
'--bam', '/data/sample.bam',
'--referenceFasta', '/data/ref.fa',
'--runDir', '/data/']
work_dir = job.fileStore.getLocalTempDir()
file_ids = [ref, bam, bai]
file_names = ['ref.fa', 'sample.bam', 'sample.bam.bai']

if candidate_indels:
_log.info('Candidate indels from Manta were provided for Strelka.')
file_ids.append(candidate_indels)
file_names.append('candidateSmallIndels.vcf.gz')
generate_parameters.extend(['--indelCandidates', '/data/candidateSmallIndels.vcf.gz'])
else:
_log.info('No candidate indels provided.')

for file_store_id, name in zip(file_ids, file_names):
job.fileStore.readGlobalFile(file_store_id, os.path.join(work_dir, name))

start_time = time.time()
dockerCall(job=job,
workDir=work_dir,
parameters=generate_parameters,
tool='quay.io/ucsc_cgl/strelka')
end_time = time.time()
log_runtime(job, start_time, end_time, 'Configuring Strelka')

start_time = time.time()
dockerCall(job=job,
workDir=work_dir,
parameters=['/data/runWorkflow.py',
'-m', 'local',
'-j', str(job.cores)],
tool='quay.io/ucsc_cgl/strelka')
end_time = time.time()
log_runtime(job, start_time, end_time, 'Strelka')

return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'variants.vcf.gz'))


def run_samtools_mpileup(job, ref, ref_fai, bam, bai):
'''
Runs the samtools mpileup variant caller.
:param JobFunctionWrappingJob job: passed automatically by Toil.
:param str ref: The reference FASTA FileStoreID.
:param str ref_fai: The reference FASTA index FileStoreID.
:param str bam: The FileStoreID of the BAM to call.
:param str bai: The FileStoreID of the BAM index.
'''
work_dir = job.fileStore.getLocalTempDir()
file_ids = [ref, ref_fai, bam, bai]
file_names = ['ref.fa', 'ref.fa.fai', 'sample.bam', 'sample.bam.bai']
for file_store_id, name in zip(file_ids, file_names):
job.fileStore.readGlobalFile(file_store_id, os.path.join(work_dir, name))

start_time = time.time()
dockerCall(job=job,
workDir=work_dir,
parameters=['mpileup',
'-f', '/data/ref.fa',
'-o', '/data/sample.vcf.gz',
'/data/sample.bam'],
tool='quay.io/ucsc_cgl/samtools')
end_time = time.time()
log_runtime(job, start_time, end_time, 'samtools mpileup')

return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.vcf.gz'))


def run_bcftools_call(job, vcf_gz):
'''
Runs the bcftools call command.
:param JobFunctionWrappingJob job: passed automatically by Toil.
:param str vcf_gz: The FileStoreID of the GZIPed VCF.
'''
work_dir = job.fileStore.getLocalTempDir()
job.fileStore.readGlobalFile(vcf_gz, os.path.join(work_dir, 'sample.vcf.gz'))

start_time = time.time()
dockerCall(job=job,
workDir=work_dir,
parameters=['call',
'-o', '/data/sample.calls.vcf.gz',
'--threads', str(job.cores),
'/data/sample.vcf.gz'],
tool='quay.io/ucsc_cgl/bcftools')
end_time = time.time()
log_runtime(job, start_time, end_time, 'bcftools call')

return job.fileStore.writeGlobalFile(os.path.join(work_dir, 'sample.calls.vcf.gz'))

0 comments on commit 2929f96

Please sign in to comment.