-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathcreate_imgtdb.py
executable file
·221 lines (184 loc) · 7.48 KB
/
create_imgtdb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# -*- coding: utf-8 -*-
#
# create_imgtdb.py
# Copyright (c) 2017 Be The Match operated by National Marrow Donor Program. All Rights Reserved.
#
# This library is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation; either version 3 of the License, or (at
# your option) any later version.
#
# This library is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; with out even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this library; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
#
# > http://www.fsf.org/licensing/licenses/lgpl.html
# > http://www.opensource.org/licenses/lgpl-license.php
#
import re
import os
import sys
import logging
import argparse
import pandas as pd
import urllib.request
from Bio import SeqIO
from BioSQL import BioSeqDatabase
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%m/%d/%Y %I:%M:%S %p',
level=logging.INFO)
skip_alleles = ["HLA-DRB5*01:11", "HLA-DRB5*01:12", "HLA-DRB5*01:13",
"HLA-DRB5*02:03", "HLA-DRB5*02:04", "HLA-DRB5*02:05",
"HLA-DRB5*01:01:02", "HLA-DRB5*01:03", "HLA-DRB5*01:05",
"HLA-DRB5*01:06", "HLA-DRB5*01:07", "HLA-DRB5*01:09",
"HLA-DRB5*01:10N"]
def download_dat(db):
url = 'https://raw.githubusercontent.com/ANHIG/IMGTHLA/' + db + '/hla.dat'
dat = ".".join([db, "hla", "dat"])
urllib.request.urlretrieve(url, dat)
return dat
def download_allelelist(db):
url = 'https://raw.githubusercontent.com/ANHIG/IMGTHLA/Latest/Allelelist.' + db + '.txt'
alist = ".".join([db, "Allelelist", "txt"])
urllib.request.urlretrieve(url, alist)
return alist
def main():
"""This is run if file is directly executed, but not if imported as
module. Having this in a separate function allows importing the file
into interactive python, and still able to execute the
function for testing"""
parser = argparse.ArgumentParser()
parser.add_argument("-v", "--verbose",
help="Option for running in verbose",
action='store_true')
parser.add_argument("-n", "--number",
required=False,
help="Number of IMGT/DB releases",
default=1,
type=int)
parser.add_argument("-r", "--releases",
required=False,
help="IMGT/DB releases",
type=str)
args = parser.parse_args()
releases = args.releases
number = args.number
if args.verbose:
verbose = True
else:
verbose = False
if releases:
dblist = [db for db in releases.split(",")]
else:
try:
versions_url = "https://www.ebi.ac.uk/ipd/imgt/hla/docs/release.html"
df = pd.read_html(versions_url)[0]
x = df.columns
dblist = [l.replace(".", '')
for l in df[x[0]].tolist()[0:number]]
except ValueError as err:
db_error = "Failed to load DB list: {0}".format(err)
logging.info(db_error)
logging.info("Defaulting to Latest")
dblist = ["Latest"]
# Connecting to mysql DB
server = BioSeqDatabase.open_database(driver="pymysql", user="root",
passwd="my-secret-pw", host="localhost",
db="bioseqdb")
if verbose:
dbversions_str = ",".join(dblist)
logging.info("IMGT/HLA DB Versions = " + dbversions_str)
# Looping through DB verions
for dbv in dblist:
# Downloading hla.dat file
hladat = download_dat(dbv)
if verbose:
logging.info("Finished downloading hla.dat file for " + str(dbv))
# Downloading allele list
allele_list = download_allelelist(dbv)
if verbose:
logging.info("Finished downloading allele list for " + str(dbv))
hla_names = {}
try:
# File formats change...
with open(allele_list, 'r') as f:
for line in f:
line = line.rstrip()
if re.search("#", line) or re.search("AlleleID", line):
continue
accession, name = line.split(",")
hla_names.update({accession: name})
f.close()
if verbose:
nalleles = len(hla_names.keys())
logging.info("Finished loading " + str(nalleles)
+ " alleles for " + str(dbv))
except ValueError as err:
list_error = "Allelelist error: {0}".format(err)
logging.error(list_error)
server.close()
os.remove(hladat)
os.remove(allele_list)
sys.exit()
cmd = "perl -p -i -e 's/[^\\x00-\\x7F]//g' " + hladat
os.system(cmd)
# Loading sequence data from hla.dat file
try:
seq_list = list(SeqIO.parse(hladat, "imgt"))
except:
#read_error = "Read dat error: {0}".format(err)
logging.error("ERROR LOADING!!")
server.close()
os.remove(hladat)
os.remove(allele_list)
sys.exit()
new_seqs = {"A": [], "B": [], "C": [], "DRB1": [],
"DQB1": [], "DRB3": [], "DRB4": [], "DRB5": [],
"DQA1": [], "DPA1": [], "DPB1": []}
# Changing the sequence name to
# the HLA allele name instead of the accession
for seq in seq_list:
if seq.name in hla_names:
loc, allele = hla_names[seq.name].split("*")
if loc in new_seqs:
hla_name = "HLA-" + hla_names[seq.name]
if not hla_name in skip_alleles:
seq.name = hla_name
new_seqs[loc].append(seq)
dbsp = list(dbv)
descr = ".".join([dbsp[0], dbsp[1]+dbsp[2], dbsp[3]])
if verbose:
logging.info("Loaded IMGT dat file " + descr)
# Looping through and loading each locus
for locus in new_seqs:
dbname = dbv + "_" + locus
dbdescription = "IMGT/HLA " + descr + " " + locus
db = server.new_database(dbname, description=dbdescription)
try:
count = db.load(new_seqs[locus])
except:
load_fail = sys.exc_info()[0]
logging.error("Faild to load " + load_fail)
server.close()
os.remove(hladat)
os.remove(allele_list)
sys.exit()
if verbose:
logging.info("Loaded " + str(count) + " for " + dbname)
# Commiting data to mysql db
server.commit()
# Removing hla.dat and allele list files
os.remove(hladat)
os.remove(allele_list)
if verbose:
logging.info("Finished loading " + descr)
server.close()
if __name__ == '__main__':
"""The following will be run if file is executed directly,
but not if imported as a module"""
main()