-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathkeep_central_snp.py
119 lines (87 loc) · 3.8 KB
/
keep_central_snp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#!/usr/bin/env python3
# Copyright 2019 Duarte Teomoteo Balata <[email protected]>
# keep_central_snps.py is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# keep_central_snps is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with keep_central_snps. If not, see <http://www.gnu.org/licenses/>.
import argparse
import sys
def get_args(args):
parser = argparse.ArgumentParser(prog='python3')
parser.add_argument("input", metavar="input.vfc", type=str,
help="VCF file with all the SNPs.")
parser.add_argument("-o", "--output", metavar="output.vcf", dest="output", type=str,
help="output file with the SNPs closest to the centre of each locus (default = ./center_snps_filtered.vcf)")
parser.add_argument("-l", "--len", metavar="int", dest="length", type=str,
help="length of each locus (required)", required=True)
arguments = parser.parse_args(args)
return arguments
# writes vcf header lines to output
def write_vcf_headers(vcf_path, output_path):
vcf_file = open(vcf_path, "r")
out_file = open(output_path, "w")
for line in vcf_file:
if line.startswith("#"):
out_file.write(line)
else:
break
vcf_file.close()
out_file.close()
# Chooses the most central SNP based on loci ID and SNP read position
def write_vcf_body(vcf_path, output_path, locus_length):
vcf_file = open(vcf_path, "r")
out_file = open(output_path, "a")
buff = []
dist = 0
last_id = ""
last_name = ""
loci_id = ""
for line in vcf_file:
try:
name = line.split("\t")[0]
dist = int(line.split("\t")[1])
loci_id = line.split("\t")[2].split(":")[0]
# if locus/ chromossome is different from last line, writes the
# previous best snps and creates a new list
if last_id != loci_id or last_name != name:
if len(buff) > 0:
prev_best = int(locus_length)/2
for loci in buff:
loci_position = int(loci.split("\t")[2].split(":")[1])
# the snp ocurring closer to the read center is considered optimal.
if abs(int(locus_length)/2-loci_position) <= prev_best:
center = loci
prev_best = abs(int(locus_length)/2-loci_position)
out_file.write(center)
buff = []
buff.append(line)
else:
buff.append(line)
last_id = loci_id
last_name = name
except (ValueError, IndexError) as error:
pass
# writes last iteration
prev_best = int(locus_length)/2
for loci in buff:
loci_position = int(loci.split("\t")[2].split(":")[1])
if abs(int(locus_length)/2-loci_position) < prev_best:
center = loci
prev_best = abs(int(locus_length)/2-loci_position)
out_file.write(center)
vcf_file.close()
out_file.close()
if __name__ == "__main__":
args = get_args(sys.argv[1:])
try:
write_vcf_headers(args.input, args.output)
write_vcf_body(args.input, args.output, args.length)
except IndexError:
write_vcf_headers(args.input, "central_snps_filtered.vcf")
write_vcf_body(args.input, "central_snps_filtered.vcf", args.length)