From 88cc140becc13660ad1cd22aa75875a981050bb1 Mon Sep 17 00:00:00 2001 From: "Vipin T. Sreedharan" Date: Fri, 18 May 2012 17:41:22 +0200 Subject: [PATCH] Prefiltering of FASTQ files --- Ler_transcriptome/distributed_feature.py | 40 +-- .../distributed_feature_stats.py | 252 ------------------ Ler_transcriptome/utils/condense_convert.py | 54 ++++ Ler_transcriptome/utils/read_length_dist.py | 8 +- Ler_transcriptome/venn_features.py | 60 ----- gfftools/codebase/MergeLocus.py | 3 +- hpc/map_reduce.py | 179 +++++++++++++ hpc/multiprocessing_mapreduce.py | 54 ++++ nextgen/fastq_process.py | 14 + 9 files changed, 333 insertions(+), 331 deletions(-) delete mode 100644 Ler_transcriptome/distributed_feature_stats.py create mode 100644 Ler_transcriptome/utils/condense_convert.py delete mode 100644 Ler_transcriptome/venn_features.py create mode 100644 hpc/map_reduce.py create mode 100644 hpc/multiprocessing_mapreduce.py create mode 100644 nextgen/fastq_process.py diff --git a/Ler_transcriptome/distributed_feature.py b/Ler_transcriptome/distributed_feature.py index 7e29b0f..daab899 100644 --- a/Ler_transcriptome/distributed_feature.py +++ b/Ler_transcriptome/distributed_feature.py @@ -73,10 +73,10 @@ def map_fn(items): cgOR, othOR, ribOR, pdOR, teOR, intOR = [], [], [], [], [], [] for rdet in rinfo: read_strand.append(rdet[2]) - [ribXrdx, cgXrdx, othXrdx, teXrdx, pdXrdx] = [0, 0, 0, 0, 0] + ribXrdx, cgXrdx, othXrdx, teXrdx, pdXrdx=0, 0, 0, 0, 0 if rdet[0] in cg_featdb: for details, strand_info in cg_featdb[rdet[0]].items(): - if details[0]-95 <= rdet[1] and rdet[1] <= details[1]+10: + if details[0]-42 <= rdet[1] and rdet[1] <= details[1]+5: cgX+=1 cgXrdx=1 if rdet[2]==strand_info: @@ -88,7 +88,7 @@ def map_fn(items): break if rdet[0] in oth_featdb: for details, strand_info in oth_featdb[rdet[0]].items(): - if details[0]-95 <= rdet[1] and rdet[1] <= details[1]: + if details[0]-42 <= rdet[1] and rdet[1] <= details[1]+5: othX+=1 othXrdx=1 if rdet[2]==strand_info: @@ -100,7 +100,7 @@ def map_fn(items): break if rdet[0] in ribo_featdb: for details, strand_info in ribo_featdb[rdet[0]].items(): - if details[0]-95 <= rdet[1] and rdet[1] <= details[1]+10: + if details[0]-42 <= rdet[1] and rdet[1] <= details[1]+5: ribX+=1 ribXrdx =1 if rdet[2]==strand_info: @@ -112,7 +112,7 @@ def map_fn(items): break if rdet[0] in te_featdb: for details, strand_info in te_featdb[rdet[0]].items(): - if details[0]-95 <= rdet[1] and rdet[1] <= details[1]: + if details[0]-42 <= rdet[1] and rdet[1] <= details[1]+5: teX+=1 teXrdx=1 if rdet[2]==strand_info: @@ -124,7 +124,7 @@ def map_fn(items): break if rdet[0] in psd_featdb: for details, strand_info in psd_featdb[rdet[0]].items(): - if details[0]-95 <= rdet[1] and rdet[1] <= details[1]: + if details[0]-42 <= rdet[1] and rdet[1] <= details[1]+5: pdX+=1 pdXrdx=1 if rdet[2]==strand_info: @@ -136,7 +136,7 @@ def map_fn(items): break if [ribXrdx, cgXrdx, othXrdx, teXrdx, pdXrdx].count(0)==5: intXrd = 1 - if rdet[2]==strand_info: + if rdet[2]=='+': intOR.append(1) else: intOR.append(-1) @@ -314,20 +314,28 @@ def MakeReadDB(fbam): fh_txt.close() return bamdb -if __name__ == "__main__": +def breakDown(rawInput, chunk_size): + small_packs=dict() + for i in range(0, len(rawInput), chunk_size): + small_packs[i]=rawInput[i:i+chunk_size] + return small_packs + +if __name__=="__main__": try: - bamf = sys.argv[1] - anno_file = sys.argv[2] + bamf=sys.argv[1] + anno_file=sys.argv[2] except: print __doc__ sys.exit(-1) print time.asctime( time.localtime(time.time()) ) - te_featdb, psd_featdb, cg_featdb, oth_featdb, ribo_featdb = dict(), dict(), dict(), dict(), dict() # declaring variable - te_featdb, psd_featdb, cg_featdb, oth_featdb, ribo_featdb = get_Feature(anno_file) # parse function for getting annotation - #read_db = AlignGenerator(bamf) # parse function for getting read alignment information - read_db = MakeReadDB(bamf) # Take the splited BAM content - master_job = MapReduce(map_fn, reduce_fn, 6) # create an object such that the jobs are distributing in 31 CPU's - results = master_job(read_db) # start the core the job + te_featdb, psd_featdb, cg_featdb, oth_featdb, ribo_featdb=dict(), dict(), dict(), dict(), dict() # declaring variable + #te_featdb, psd_featdb, cg_featdb, oth_featdb, ribo_featdb=get_Feature(anno_file) # parse function for getting annotation + read_db=AlignGenerator(bamf) # parse function for getting read alignment information + print time.asctime( time.localtime(time.time()) ) + #read_db = MakeReadDB(bamf) # Take the splited BAM content + chunk_packs=breakDown(read_db, 100) + master_job=MapReduce(map_fn, reduce_fn, 6) # create an object such that the jobs are distributing in 31 CPU's + results = master_job(chunk_packs, len(chunk_packs)) # start the core the job print for element in sorted(results): print element[0][0], element[0][1] diff --git a/Ler_transcriptome/distributed_feature_stats.py b/Ler_transcriptome/distributed_feature_stats.py deleted file mode 100644 index 6ea93f3..0000000 --- a/Ler_transcriptome/distributed_feature_stats.py +++ /dev/null @@ -1,252 +0,0 @@ -#!/usr/bin/python -""" -Program to calculate the statistics of reads aligned to different annotated -features in a genome. This is a time-cost effective method of implementation -using the module called multiprocessing, along with MapReduce functionality. - -Usage: distributed_feature_stats.py in.bam in.gff3 -""" -import re, sys -import time -import pysam -import multiprocessing -import collections -import itertools - -class MapReduce(object): - - def __init__(self, map_func, reduce_func, num_workers=None): - """ - map_func - Function to map inputs to intermediate data. Takes as - argument one input value and returns a tuple with the key - and a value to be reduced. - - reduce_func - Function to reduce partitioned version of intermediate data - to final output. Takes as argument a key as produced by - map_func and a sequence of the values associated with that - key. - - num_workers - The number of workers to create in the pool. Defaults to the - number of CPUs available on the current host. - """ - self.map_func = map_func - self.reduce_func = reduce_func - self.pool = multiprocessing.Pool(num_workers) - - def partition(self, mapped_values): - """Organize the mapped values by their key. - Returns an unsorted sequence of tuples with a key and a sequence of values. - """ - partitioned_data = collections.defaultdict(list) - for key, value in mapped_values: - partitioned_data[key].append(value) - return partitioned_data.items() - - def __call__(self, inputs, chunksize=1): - """Process the inputs through the map and reduce functions given. - - inputs - An iterable containing the input data to be processed. - - chunksize=1 - The portion of the input data to hand to each worker. This - can be used to tune performance during the mapping phase. - """ - map_responses = self.pool.map(self.map_func, inputs, chunksize=chunksize) - partitioned_data = self.partition(itertools.chain(*map_responses)) - reduced_values = self.pool.map(self.reduce_func, partitioned_data) - return reduced_values - -def map_fn(items): - """Map function checking each reads location at the genome. - """ - rid, rinfo = items - sys.stdout.write(rid);sys.stdout.flush() - sys.stdout.write('\r' + ' '*len(rid)) - sys.stdout.flush() - read_strand=[] - ribX, cgX, othX, teX, pdX = 0, 0, 0, 0, 0 - intXrd, ribXrd, cgXrd, othXrd, teXrd, pdXrd = 0, 0, 0, 0, 0, 0 - for rdet in rinfo: - read_strand.append(rdet[2]) - [ribXrdx, cgXrdx, othXrdx, teXrdx, pdXrdx] = [0, 0, 0, 0, 0] - if rdet[0] in cg_featdb: - for details, strand_info in cg_featdb[rdet[0]].items(): - if details[0]-95 <= rdet[1] and rdet[1] <= details[1]+10: - cgX+=1 - cgXrdx=1 - if rdet[2]==strand_info: - cgXrd=1 - else: - cgXrd=-1 - break - if rdet[0] in oth_featdb: - for details, strand_info in oth_featdb[rdet[0]].items(): - if details[0]-95 <= rdet[1] and rdet[1] <= details[1]: - othX+=1 - othXrdx=1 - if rdet[2]==strand_info: - othXrd=1 - else: - othXrd=-1 - break - if rdet[0] in ribo_featdb: - for details, strand_info in ribo_featdb[rdet[0]].items(): - if details[0]-95 <= rdet[1] and rdet[1] <= details[1]+10: - ribX+=1 - ribXrdx =1 - if rdet[2]==strand_info: - ribXrd=1 - else: - ribXrd=-1 - break - if rdet[0] in te_featdb: - for details, strand_info in te_featdb[rdet[0]].items(): - if details[0]-95 <= rdet[1] and rdet[1] <= details[1]: - teX+=1 - teXrdx=1 - if rdet[2]==strand_info: - teXrd=1 - else: - teXrd=-1 - break - if rdet[0] in psd_featdb: - for details, strand_info in psd_featdb[rdet[0]].items(): - if details[0]-95 <= rdet[1] and rdet[1] <= details[1]: - pdX+=1 - pdXrdx=1 - if rdet[2]==strand_info: - pdXrd=1 - else: - pdXrd=-1 - break - if [ribXrdx, cgXrdx, othXrdx, teXrdx, pdXrdx].count(0)==5: - intXrd = 1 - xq = [] - if [ribXrd, cgXrd, othXrd, teXrd, pdXrd].count(0)==5: # intergenic read - xq = [('int_NR', 1), ('int_NA', len(rinfo))] - elif [ribXrd, cgXrd, othXrd, teXrd, pdXrd, intXrd].count(0)<=4: # multiple alignments spanning across different features - xq = [('sh_NR', 1), ('sh_NA', len(rinfo))] - elif cgXrd !=0: - xq = [('cg_NR', 1), ('cg_NA', cgX)] - elif othXrd !=0: - xq = [('oth_NR', 1), ('oth_NA', othX)] - elif teXrd !=0: - xq = [('te_NR', 1), ('te_NA', teX)] - elif pdXrd !=0: - xq = [('pd_NR', 1), ('pd_NA', pdX)] - elif ribXrd !=0: - xq = [('rib_NR', 1), ('rib_NA', ribX)] - return xq - -def reduce_fn(sub_items): - """Reduce function collecting each feature count - """ - ftype, count_ftype = sub_items - return [(ftype, sum(count_ftype))] - -def get_Feature(fname): - """Extract genome annotation information from a provided GFF file. - """ - from Core import GFF - global te_featdb; global psd_featdb; global cg_featdb; global oth_featdb; global ribo_featdb - for cid in ['Chr1', 'Chr2', 'Chr3', 'Chr4', 'Chr5', 'ChrM', 'ChrC']: ## change according to the GFF file TODO According to the file type add chromosome number automatically. - print cid - fh = open(fname, 'rU') - limit_info = dict(gff_id=[cid], gff_source=['TAIR10']) ## change the source TODO According to the file type add source flag automatically - for rec in GFF.parse(fh, limit_info=limit_info): - for each_rec in rec.features: - if each_rec.type=='gene': - for child in each_rec.sub_features: - if child.type=='mRNA': - if cid in cg_featdb: - cg_featdb[cid][(int(each_rec.location._start.position), - int(each_rec.location._end.position))]=each_rec.strand - else: - cg_featdb[cid]={(int(each_rec.location._start.position), - int(each_rec.location._end.position)): - each_rec.strand} - elif each_rec.sub_features[0].type in ['miRNA', 'ncRNA', 'snoRNA', 'snRNA', 'tRNA']: - if cid in oth_featdb: - oth_featdb[cid][(int(each_rec.location._start.position), - int(each_rec.location._end.position))]=each_rec.strand - else: - oth_featdb[cid]={(int(each_rec.location._start.position), - int(each_rec.location._end.position)): - each_rec.strand} - elif each_rec.sub_features[0].type=='rRNA': - if cid in ribo_featdb: - ribo_featdb[cid][(int(each_rec.location._start.position), - int(each_rec.location._end.position))]=each_rec.strand - else: - ribo_featdb[cid]={(int(each_rec.location._start.position), - int(each_rec.location._end.position)): - each_rec.strand} - elif each_rec.type=='pseudogene': - if cid in psd_featdb: - psd_featdb[cid][(int(each_rec.location._start.position), - int(each_rec.location._end.position))]=each_rec.strand - else: - psd_featdb[cid]={(int(each_rec.location._start.position), - int(each_rec.location._end.position)): - each_rec.strand} - elif each_rec.type in ['transposable_element', 'transposable_element_gene']: - if cid in te_featdb: - te_featdb[cid][(int(each_rec.location._start.position), - int(each_rec.location._end.position))]=each_rec.strand - else: - te_featdb[cid]={(int(each_rec.location._start.position), - int(each_rec.location._end.position)): - each_rec.strand} - fh.close() - ribo_featdb['Chr3'][(14199917, 14203578)]=1 ## Unannotated rRNA from A thaliana genome. - ribo_featdb['Chr2'][(5784, 9683)]=1 ## Unannotated rRNA from A thaliana genome. - ribo_featdb['Chr2'][(2821, 3704)]=1 # - ribo_featdb['Chr3'][(14196614, 14197675)]=1 # - ribo_featdb['Chr3'][(14194052, 14194611)]=1 # - ribo_featdb['Chr3'][(14199498, 14199751)]=1 # - ribo_featdb['Chr3'][(14195564, 14195739)]=1 # - ribo_featdb['ChrM'][(11426, 11883)]=-1 # - ribo_featdb['ChrM'][(364594, 365124)]=1 #""" - return te_featdb, psd_featdb, cg_featdb, oth_featdb, ribo_featdb - -def AlignGenerator(fbam): - """Parsing alignment file, extracting the information. - """ - samdb = dict() - smap={'+' : 1, '-' : -1} - samfile = pysam.Samfile(fbam, 'rb') - for rec in samfile.fetch(): - for cont in rec.tags: - if cont[0]=='NM':NM=cont[1] - if cont[0]=='XS': - orient=cont[1] - break - if rec.qname in samdb: - samdb[rec.qname].append((samfile.getrname(rec.rname), rec.pos, smap[orient], NM)) - else: - samdb[rec.qname]=[(samfile.getrname(rec.rname), rec.pos, smap[orient], NM)] - samfile.close() - bamdb = [(fid, finfo) for fid, finfo in samdb.items()] - return bamdb - -if __name__ == "__main__": - try: - bamf = sys.argv[1] - anno_file = sys.argv[2] - except: - print __doc__ - sys.exit(-1) - print time.asctime( time.localtime(time.time()) ) - te_featdb, psd_featdb, cg_featdb, oth_featdb, ribo_featdb = dict(), dict(), dict(), dict(), dict() # declaring variable - te_featdb, psd_featdb, cg_featdb, oth_featdb, ribo_featdb = get_Feature(anno_file) # parse function for getting annotation - read_db = AlignGenerator(bamf) # parse function for getting read alignment information - master_job = MapReduce(map_fn, reduce_fn, 31) # create an object such that the jobs are distributing in 31 CPU's - results = master_job(read_db) # start the core the job - print - for element in sorted(results): - print element - print time.asctime( time.localtime(time.time()) ) diff --git a/Ler_transcriptome/utils/condense_convert.py b/Ler_transcriptome/utils/condense_convert.py new file mode 100644 index 0000000..bf03e2b --- /dev/null +++ b/Ler_transcriptome/utils/condense_convert.py @@ -0,0 +1,54 @@ +#!/usr/bin/python +"""Program to convert condensed state alignment file to +COL-0 coordinate state. + +Usage: condense_convert.py in.scafold_map in.mummer_map +""" +import re, sys + +def grep_scaf(fname): + """read condense mapping file and create a mapping db + between condensed contigs and original scafolds. + """ + cont_scaf = dict() + scafh=open(fname) + for line in scafh: + line=line.strip('\n\r').split('\t') + if re.match(r"^#", line[0]) or line[1]=="NSPACER": + continue + if line[0] in cont_scaf: + cont_scaf[line[0]][(int(line[2]), int(line[3]))]=line[1] + else: + cont_scaf[line[0]]={(int(line[2]), int(line[3])):line[1]} + scafh.close() + return cont_scaf + +def grep_col(fname): + """read mummer whole genome alignment file and fetch + the scafold to main genome contig mapping relation. + """ + scaf_chr=dict() + fh=open(fname) + for line in fh: + line=line.strip('\n\r').split('\t') + if re.match(r"^#", line[0]): + continue + if line[2] in scaf_chr: + scaf_chr[line[2]][(int(line[4]), int(line[4])+int(line[6])-1)]=(line[1], (int(line[3]), int(line[3])+int(line[5])-1), line[7]) + else: + scaf_chr[line[2]]={(int(line[4]), int(line[4])+int(line[6])-1):(line[1], (int(line[3]), int(line[3])+int(line[5])-1), line[7])} + fh.close() + return scaf_chr + +def __main__(): + try: + scafname = sys.argv[1] + mumfname = sys.argv[2] + except: + print __doc__ + sys.exit(-1) + maps_scaf=grep_scaf(scafname) + maps_chr=grep_col(mumfname) + # strand ? at final stage + +if __name__=="__main__":__main__() diff --git a/Ler_transcriptome/utils/read_length_dist.py b/Ler_transcriptome/utils/read_length_dist.py index 19e0e4d..f7e17b1 100644 --- a/Ler_transcriptome/utils/read_length_dist.py +++ b/Ler_transcriptome/utils/read_length_dist.py @@ -8,12 +8,14 @@ import pysam import re, sys from pylab import figure, show +import math try: samfile = pysam.Samfile(sys.argv[1], 'rb') except: print 'Incorrect arguments supplied' print __doc__ sys.exit(-1) +sname=re.search(r'.*\/(\w+)\.bam', sys.argv[1]).group(1) diff_len=dict() for alignedread in samfile.fetch(): if alignedread.qlen in diff_len: @@ -24,9 +26,13 @@ xq, yq = [], [] for length in sorted(diff_len.items()): xq.append(length[0]) - yq.append(length[1]) + yq.append(math.log10(length[1])) fig = figure() ax = fig.add_subplot(111) ax.plot(xq, yq, '-') +ax.set_xlabel('Read length (nts)') +ax.set_ylabel('Nr. of alignments (log-change)') +ax.set_title(sname) ax.grid(True) +fig.savefig(sname + '.eps', transparent=True) show() diff --git a/Ler_transcriptome/venn_features.py b/Ler_transcriptome/venn_features.py deleted file mode 100644 index 937c832..0000000 --- a/Ler_transcriptome/venn_features.py +++ /dev/null @@ -1,60 +0,0 @@ -#!/usr/bin/python -""" -Find the reads mapped to rRNA, Coding region, TE and other structural RNA regions, both individually and in a shared way to plot as Venn diagram. -""" -import re, sys, os - -def AlignReader(fname): - - fbh = os.popen('/fml/ag-raetsch/share/software/samtools//samtools view ' + fname) - for line in fbh: - line = line.strip('\r\n').split('\t') - yield line[0] - fbh.close() - -def __main__(): - - try: - ribo_bam = sys.argv[1] - cg_bam = sys.argv[2] - te_bam = sys.argv[3] - except: - sys.stderr.write('Access denied for BAM files \n') - sys.exit(-1) - - Generator = AlignReader(ribo_bam) - ribo_read = dict() - for rid in Generator: - ribo_read[rid] = 1 - cg_read = dict() - Generator = AlignReader(cg_bam) - for rid in Generator: - cg_read[rid] = 1 - te_read = dict() - Generator = AlignReader(te_bam) - for rid in Generator: - te_read[rid] = 1 - - te_reads = [fid for fid in te_read] - te_reads = set(tuple(te_reads)) - cg_reads = [fid for fid in cg_read] - cg_reads = set(tuple(cg_reads)) - ribo_reads = [fid for fid in ribo_read] - ribo_reads = set(tuple(ribo_reads)) - - te_cg_cnt = len(te_reads & cg_reads) - te_ribo_cnt = len(te_reads & ribo_reads) - cg_ribo_cnt = len(cg_reads & ribo_reads) - te_ribo_cg_cnt = len(cg_reads & ribo_reads & te_reads) - - print 'Individual counts: rRNA, TE, Coding genes\t', len(ribo_reads), len(te_reads), len(cg_reads) - print 'TE and Coding genes\t', te_cg_cnt - print 'TE and rRNA\t', te_ribo_cnt - print 'Coding gene and rRNA\t', cg_ribo_cnt - print 'TE, Coding gene, rRNA\t', te_ribo_cg_cnt - - print 'TE bin: ', len(te_reads) - (te_ribo_cnt + te_cg_cnt + te_ribo_cg_cnt) - print 'rRNA bin: ', len(ribo_reads) - (te_ribo_cnt + cg_ribo_cnt + te_ribo_cg_cnt) - print 'Coding gene: ', len(cg_reads) - (te_cg_cnt + cg_ribo_cnt + te_ribo_cg_cnt) - -if __name__=="__main__":__main__() diff --git a/gfftools/codebase/MergeLocus.py b/gfftools/codebase/MergeLocus.py index 354a508..d0ed857 100644 --- a/gfftools/codebase/MergeLocus.py +++ b/gfftools/codebase/MergeLocus.py @@ -304,8 +304,7 @@ def ParseGFF(gff_file): gff_handle.close() return genes, transcripts, exons, utr5, cds, utr3 -import re, sys -import time +import re, sys, time import collections if __name__=='__main__': diff --git a/hpc/map_reduce.py b/hpc/map_reduce.py new file mode 100644 index 0000000..e44ba35 --- /dev/null +++ b/hpc/map_reduce.py @@ -0,0 +1,179 @@ +import threading +import Queue +import operator +import re + +class MapReduce: + """ MapReduce - to use, subclass by defining these functions, + then call self.map_reduce(): + parse_fn(self, k, v) => [(k, v), ...] + map_fn(self, k, v) => [(k, v1), (k, v2), ...] + reduce_fn(self, k, [v1, v2, ...]) => [(k, v)] + output_fn(self, [(k, v), ...]) + """ + def __init__(self): + self.data = None + self.num_worker_threads = 5 + + class SynchronizedDict(): # we need this for merging + def __init__(self): + self.lock = threading.Lock() + self.d = {} + def isin(self, k): + with self.lock: + if k in self.d: + return True + else: + return False + def get(self, k): + with self.lock: + return self.d[k] + def set(self, k, v): # we don't need del + with self.lock: + self.d[k] = v + def set_append(self, k, v): # for thread-safe list append + with self.lock: + self.d[k].append(v) + def items(self): + with self.lock: + return self.d.items() # + + def create_queue(self, input_list): # helper fn for queues + output_queue = Queue.Queue() + for value in input_list: + output_queue.put(value) + return output_queue + + def create_list(self, input_queue): # helper fn for queues + output_list = [] + while not input_queue.empty(): + item = input_queue.get() + output_list.append(item) + input_queue.task_done() + return output_list + + def merge_fn(self, k, v, merge_dict): # helper fn for merge + if merge_dict.isin(k): + merge_dict.set_append(k, v) + else: + merge_dict.set(k, [v]) + + def process_queue(self, input_queue, fn_selector): # helper fn + output_queue = Queue.Queue() + if fn_selector == 'merge': + merge_dict = self.SynchronizedDict() + def worker(): + while not input_queue.empty(): + (k, v) = input_queue.get() + if fn_selector in ['map', 'reduce']: + if fn_selector == 'map': + result_list = self.map_fn(k, v) + elif fn_selector == 'reduce': + result_list = self.reduce_fn(k, v) + for result_tuple in result_list: # flatten + output_queue.put(result_tuple) + elif fn_selector == 'merge': # merge v to same k + self.merge_fn(k, v, merge_dict) + else: + raise Exception, "Bad fn_selector="+fn_selector + input_queue.task_done() + + for i in range(self.num_worker_threads): # start threads + worker_thread = threading.Thread(target=worker) + worker_thread.daemon = True + worker_thread.start() + input_queue.join() # wait for worker threads to finish + if fn_selector == 'merge': + output_list = sorted(merge_dict.items(), key=operator.itemgetter(0)) + output_queue = self.create_queue(output_list) + return output_queue + + def map_reduce(self): # the actual map-reduce algoritm + data_dict = self.parse_fn(self.data) + data_queue = self.create_queue(data_dict) # enqueue the data so we can multi-process + map_queue = self.process_queue(data_queue, 'map') # [(k,v),...] => [(k,v1),(k,v2),...] + merge_queue = self.process_queue(map_queue, 'merge') # [(k,v1),(k,v2),...] => [(k,[v1,v2,...]),...] + reduce_queue = self.process_queue(merge_queue, 'reduce') # [(k,[v1,v2,...]),...] => [(k,v),...] + output_list = self.create_list(reduce_queue) # deque into list for output handling + self.output_fn(output_list) + +"""class WordCount(MapReduce): + + def __init__(self): + MapReduce.__init__(self) + self.min_count = 1 + + def parse_fn(self, data): # break string into [(k, v), ...] tuples for each line + data_list = map(lambda line: (None, line), data.splitlines()) + return data_list + + def map_fn(self, key, str): # return (word, 1) tuples for each word, ignore key + word_list = [] + for word in re.split(r'\W+', str.lower()): + bare_word = re.sub(r"[^A-Za-z0-9]*", r"", word); + if len(bare_word) > 0: + word_list.append((bare_word, 1)) + return word_list + + def reduce_fn(self, word, count_list): # just sum the counts + return [(word, sum(count_list))] + + def output_fn(self, output_list): # just print the resulting list + print "Word".ljust(15), "Count".rjust(5) + print "______________".ljust(15), "_____".rjust(5) + sorted_list = sorted(output_list, key=operator.itemgetter(1), reverse=True) + for (word, count) in sorted_list: + if count > self.min_count: + print word.ljust(15), repr(count).rjust(5) + print + + def test_with_monty(self): + #self.data = The Meaning of Life is: + # try and be nice to people, + # avoid eating fat, + # read a good book every now and then, + # get some walking in, + # and try and live together in peace and harmony + # with people of all creeds and nations. + self.map_reduce() + +class DistributedGrep(MapReduce): + + def __init__(self): + MapReduce.__init__(self) + self.matcher = None + + def parse_fn(self, data): # one list item per line with line number + data_list = [] + line_num = 1 + for line in data.splitlines(): + data_list.append((line_num, line)) + line_num = line_num + 1 + return data_list + + def map_fn(self, line_num, line): # return line if matches, include line num + matcher = self.matcher + matched_line = [] + if matcher.match(line): + matched_line = [(line_num, line)] + return matched_line + + def reduce_fn(self, line_num, line_list): # identity reducer + return [(line_num, line_list[0])] # we only ever have one line in the list + + def output_fn(self, output_list): # just print the resulting list + print "LineNum".rjust(8), "Line".ljust(70) + print "_______".rjust(8), "____" + for (line_num, line) in sorted(output_list, key=operator.itemgetter(0)): + print repr(line_num).rjust(8), line.ljust(70) + print + +def main(): + wc = WordCount() + wc.test_with_monty() + + #dg = DistributedGrep() + +if __name__ == "__main__": + main()""" + diff --git a/hpc/multiprocessing_mapreduce.py b/hpc/multiprocessing_mapreduce.py new file mode 100644 index 0000000..de1c4c7 --- /dev/null +++ b/hpc/multiprocessing_mapreduce.py @@ -0,0 +1,54 @@ +import collections +import itertools +import multiprocessing + +class MapReduce(object): + + def __init__(self, map_func, reduce_func, num_workers=None): + """ + map_func + + Function to map inputs to intermediate data. Takes as + argument one input value and returns a tuple with the key + and a value to be reduced. + + reduce_func + + Function to reduce partitioned version of intermediate data + to final output. Takes as argument a key as produced by + map_func and a sequence of the values associated with that + key. + + num_workers + + The number of workers to create in the pool. Defaults to the + number of CPUs available on the current host. + """ + self.map_func = map_func + self.reduce_func = reduce_func + self.pool = multiprocessing.Pool(num_workers) + + def partition(self, mapped_values): + """Organize the mapped values by their key. + Returns an unsorted sequence of tuples with a key and a sequence of values. + """ + partitioned_data = collections.defaultdict(list) + for key, value in mapped_values: + partitioned_data[key].append(value) + return partitioned_data.items() + + def __call__(self, inputs, chunksize=1): + """Process the inputs through the map and reduce functions given. + + inputs + An iterable containing the input data to be processed. + + chunksize=1 + The portion of the input data to hand to each worker. This + can be used to tune performance during the mapping phase. + """ + map_responses = self.pool.map(self.map_func, inputs, chunksize=chunksize) + partitioned_data = self.partition(itertools.chain(*map_responses)) + reduced_values = self.pool.map(self.reduce_func, partitioned_data) + return reduced_values + diff --git a/nextgen/fastq_process.py b/nextgen/fastq_process.py new file mode 100644 index 0000000..31073e8 --- /dev/null +++ b/nextgen/fastq_process.py @@ -0,0 +1,14 @@ +#!/usr/bin/python +"""FASTQ file pre-processing, before alignment to check for any invalid +read entries which were not filtered out. + +""" +import re, sys +from Bio import SeqIO + +outfq=open(sys.argv[2], 'w') +for rec in SeqIO.parse(sys.argv[1], "fastq"): + if len(rec.seq)<90: + continue + SeqIO.write(rec, outfq, 'fastq') +outfq.close()