-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfastaStats.py
111 lines (93 loc) · 4.12 KB
/
fastaStats.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#!/usr/bin/env python3
# Parse a fasta file, split into windows and compute stats
import sys
import argparse
import os
import shutil
import numpy as np
# local imports
from utils import fasta
from utils import windows
from utils import GC
from utils import kmers
from utils import dictionaries
parser = argparse.ArgumentParser()
parser.add_argument("-f", "--fasta", type=str, help = "The input fasta", required=True)
parser.add_argument("-w", "--windowSize", type=int, help = "The size of windows to iterate over. Default 1000", default=1000)
parser.add_argument("-v", "--overlap", type=int, help = "The overlap of windows. Default is same as window size - i.e. no overlap", default=1000)
parser.add_argument("-k", "--kmerLength", type=int, help = "The length of kmer to calculate diversity of. Default is 4 - i.e. tetranucleotide diversity", default=4)
parser.add_argument("-o", "--outdir", type=str, help = "The output directory for GC stats", default="fastaStats_output")
args = parser.parse_args()
input_fasta = args.fasta
windowSize = args.windowSize
overlap = args.overlap
kmerLength = args.kmerLength
outdir = args.outdir
if os.path.exists(outdir):
print("[WARNING] \tRemoving existing output directory ('" + outdir + "') and its contents.")
shutil.rmtree(outdir)
os.mkdir(outdir)
# write a general info file
GeneralStats = open(outdir + '/' + 'GeneralStats.txt', 'w')
GeneralStats.write('Arguments used: ' + " ".join(sys.argv[1:]) + '\n')
# write to CSV
GCperWindowCSV = open(outdir + '/' + 'FastaStats.csv', 'w')
GCperWindowCSV.write('ID,bin,GCPercent,GCSkew,UniqueKmers\n')
# bin integer
bin_int = 0
# keep track of fastas being processed
fastaCount = 0
totalSeqLength = 0
seq_lengths = []
totalNucleotideCounts = {}
# go through each fasta
for header, sequence in fasta.parse_fastai(input_fasta):
# general data
fastaCount += 1
seq_length = len(sequence)
seq_lengths.append(seq_length)
totalSeqLength += seq_length
# begin sliding windows
wins = windows.slidingWindow(sequence, size=windowSize, step=overlap, fillvalue="-")
# let's count the windows
i = 0
for window in wins:
if i % 100 == 0:
print("[STATUS] \t" + str(i) + " windows processed for " + str(header) + ".", end = "\r")
# get the sequence
seq = ''.join(window)
# calculate GC stats
currentWindow = str(bin_int) + '-' + str(bin_int + windowSize)
PerGC, GCSkew, nucleotideCounts = GC.GCStats(seq)
# calculate kmer stats
UniqueKmers = kmers.getUniqueKmers(seq, kmerLength)
formattedHeader = header.replace(">.", "")
GCperWindowCSV.write(formattedHeader + ',' + currentWindow + ',' + str(PerGC) + ',' + str(GCSkew) + ',' + str(UniqueKmers) + '\n')
# keep running total of nucleotide counts
totalNucleotideCounts = dictionaries.mergeDictionaries(totalNucleotideCounts, nucleotideCounts)
if bin_int < seq_length - overlap:
bin_int += overlap
else:
bin_int = 0
i += 1
# end sliding windows
print("\n[STATUS] \t" + "contig " + str(fastaCount) + " processed.")
GCperWindowCSV.close()
# general stats about the genome
GeneralStats.write("Total number of contigs processed: " + str(fastaCount) + '\n')
GeneralStats.write("Total sequence length processed: " + str(totalSeqLength) + '\n')
# global GC
globalGC = (totalNucleotideCounts['G'] + totalNucleotideCounts['C']) / (totalNucleotideCounts['G'] + totalNucleotideCounts['C'] + totalNucleotideCounts['A'] + totalNucleotideCounts['T'])
GeneralStats.write("Global GC%: " + str(globalGC) + "\n")
# calculate L10-50 & N10-50
lengthsArray = np.array(seq_lengths)
sortedLengths = lengthsArray[np.argsort(-lengthsArray)]
cumulativeSums = np.cumsum(sortedLengths)
for level in [10, 20, 30, 40, 50]:
nx = int(totalSeqLength * (level / 100))
cSumN = min(cumulativeSums[cumulativeSums >= nx])
l_level = int(np.where(cumulativeSums == cSumN)[0])
n_level = int(sortedLengths[l_level])
GeneralStats.write("L" + str(level) + ": " + str(l_level) + "\n")
GeneralStats.write("N" + str(level) + ": " + str(n_level) + "\n")
GeneralStats.close()