-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprocess.py
67 lines (54 loc) · 1.97 KB
/
process.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import csv
import ipaddress
from itertools import islice
from concurrent.futures import ThreadPoolExecutor
def process_asn(asn):
"""Process ASN, remove 'AS' prefix, return 0 if empty"""
if not asn:
return 0
return int(str(asn).replace("AS", "").strip())
def ip_to_cidr(start_ip, end_ip):
try:
if ":" in start_ip: # IPv6
start = ipaddress.IPv6Address(start_ip)
end = ipaddress.IPv6Address(end_ip)
else: # IPv4
start = ipaddress.IPv4Address(start_ip)
end = ipaddress.IPv4Address(end_ip)
return str(next(ipaddress.summarize_address_range(start, end)))
except:
return None
def process_chunk(chunk):
results = []
for row in chunk:
cidr = ip_to_cidr(row[0], row[1])
if cidr:
# Reserved: CIDR, country_code, continent_code, as_number, as_name
results.append(
[
cidr, # CIDR
row[2], # country_code
row[4], # continent_code
process_asn(row[6]), # as_number
row[7] or "", # as_name
]
)
return results
def process_csv(input_file, output_file, chunk_size=5000):
with open(input_file, "r") as f_in, open(output_file, "w", newline="") as f_out:
reader = csv.reader(f_in)
next(reader) # skip header
writer = csv.writer(f_out)
writer.writerow(
["cidr", "country_code", "continent_code", "as_number", "as_name"]
)
with ThreadPoolExecutor(max_workers=8) as executor:
while True:
chunk = list(islice(reader, chunk_size))
if not chunk:
break
future = executor.submit(process_chunk, chunk)
results = future.result()
writer.writerows(results)
if __name__ == "__main__":
process_csv("libs/country_asn.csv", "libs/ipinfo-lite.csv")