Skip to content

Commit c8d2d15

Browse files
authored
Create haas_ioc_feed.py
added haas_ioc_feed.py Signed-off-by: Andre Ross <[email protected]>
1 parent 52590dd commit c8d2d15

File tree

1 file changed

+139
-0
lines changed

1 file changed

+139
-0
lines changed

haas_ioc_feed.py

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
#!/var/www/MISP/venv/bin/python
2+
# HaaS Feed Fetcher and Filter
3+
# Version: 4.0
4+
# Date: 15 October 2024
5+
# Author: A.R.
6+
# License: MIT
7+
8+
"""
9+
This script automates the process of fetching IP addresses from the HaaS (Honeypot as a Service) feed provided by
10+
haas.nic.cz. It downloads the latest feed, extracts unique IP addresses, filters out non-public and special-use IP
11+
addresses as defined by RFCs, and excludes IPs present in the `consolidated_ips.json` file.
12+
13+
By utilising functions from `zero_noise_ips.py`, specifically `is_non_public_ip` and `update_consolidated_ips`,
14+
the script ensures that only public, routable IP addresses that are not known benign or irrelevant are retained.
15+
The cleaned list of IPs is then saved to a specified CSV file.
16+
17+
This process reduces false positives and enhances the relevance of the data when integrating with security platforms
18+
like MISP or OpenCTI. The script is designed to handle large datasets efficiently, including mechanisms to check for
19+
data changes and avoid unnecessary processing.
20+
"""
21+
22+
import requests
23+
import os
24+
import gzip
25+
import hashlib
26+
import logging
27+
import json
28+
import csv
29+
from datetime import datetime, timedelta
30+
import re
31+
from zero_noise_ips import is_non_public_ip, update_consolidated_ips # Import necessary functions
32+
33+
def get_hash_of_file(file_content):
34+
"""Calculates the MD5 hash of the file content."""
35+
return hashlib.md5(file_content).hexdigest()
36+
37+
def process_json_to_csv(gz_path, csv_path, consolidated_ips):
38+
"""
39+
Processes a gzipped JSON file, extracts unique public IPs not in consolidated_ips,
40+
and writes them to a CSV, checking for changes.
41+
"""
42+
with gzip.open(gz_path, 'rt', encoding='utf-8') as f_in:
43+
data = json.load(f_in) # Parse JSON directly from the file object
44+
current_hash = get_hash_of_file(json.dumps(data).encode('utf-8')) # Hash the parsed data
45+
46+
# Check if the CSV file exists and if the hash matches
47+
if os.path.exists(csv_path):
48+
with open(csv_path, 'r') as f:
49+
existing_hash = f.readline().strip()
50+
if existing_hash == current_hash:
51+
logging.info("No changes in data. Existing data is up-to-date.")
52+
return False
53+
54+
unique_ips = set()
55+
rows = []
56+
for entry in data:
57+
ip = entry.get('ip')
58+
if ip and not is_non_public_ip(ip) and ip not in consolidated_ips and ip not in unique_ips:
59+
unique_ips.add(ip)
60+
rows.append({'dst-ip': ip})
61+
62+
if not rows:
63+
logging.info("No new IPs to write after filtering.")
64+
return False
65+
66+
with open(csv_path, 'w', newline='') as csvfile:
67+
writer = csv.DictWriter(csvfile, fieldnames=['dst-ip'])
68+
writer.writeheader()
69+
writer.writerows(rows)
70+
71+
# Store hash at the beginning of the file
72+
with open(csv_path, 'r+') as f:
73+
content = f.read()
74+
f.seek(0, 0)
75+
f.write(current_hash + '\n' + content)
76+
77+
logging.info(f"Processed JSON data and written to CSV at {csv_path}")
78+
return True
79+
80+
def download_latest_file(_base_url, _download_dir, _extract_dir, consolidated_ips, days_delay=1):
81+
"""Downloads the latest HaaS feed file, processes it, and cleans up."""
82+
83+
def clean_directory(directory, pattern):
84+
for filename in os.listdir(directory):
85+
if re.match(pattern, filename):
86+
filepath = os.path.join(directory, filename)
87+
os.remove(filepath)
88+
89+
clean_directory(_download_dir, r'\d{4}-\d{2}-\d{2}\.json\.gz') # Clean directory of old files
90+
os.makedirs(_extract_dir, exist_ok=True)
91+
92+
target_date = datetime.now() - timedelta(days=days_delay)
93+
file_date = target_date.strftime("%Y-%m-%d")
94+
year = target_date.strftime("%Y")
95+
month = target_date.strftime("%m")
96+
file_name = f"{file_date}.json.gz"
97+
gz_path = os.path.join(_download_dir, file_name)
98+
csv_path = os.path.join(_extract_dir, "haas_feed.csv")
99+
url = f"{_base_url}/{year}/{month}/{file_name}"
100+
101+
logging.info(f"Attempting to download file from {url}")
102+
try:
103+
response = requests.get(url)
104+
if response.status_code == 200 and response.content:
105+
with open(gz_path, 'wb') as f:
106+
f.write(response.content)
107+
logging.info(f"Downloaded {file_name} to {gz_path}")
108+
109+
if process_json_to_csv(gz_path, csv_path, consolidated_ips):
110+
os.remove(gz_path) # Delete the gzip file only if CSV was updated
111+
else:
112+
os.remove(gz_path)
113+
else:
114+
logging.error(f"Failed to download or empty content: {url}, Status Code: {response.status_code}, Content-Length: {response.headers.get('Content-Length', 'Unknown')}")
115+
except Exception as e:
116+
logging.error(f"Error downloading or processing file: {e}")
117+
raise
118+
119+
# Logging configuration
120+
logging.basicConfig(
121+
filename='/var/log/local_feeds.log',
122+
level=logging.INFO,
123+
format='%(asctime)s - %(levelname)s - haas_feed_fetcher.py: %(message)s'
124+
)
125+
126+
# Main execution (if run as a script)
127+
if __name__ == "__main__":
128+
# Update the consolidated IPs and load them into a set
129+
logging.info("Updating consolidated IPs from zero_noise_ips.py...")
130+
consolidated_ips_set = update_consolidated_ips()
131+
if not consolidated_ips_set:
132+
with open('consolidated_ips.json', 'r') as file:
133+
consolidated_ips_set = set(json.load(file))
134+
logging.info("Consolidated IPs have been updated.")
135+
136+
base_url = "https://haas.nic.cz/stats/export"
137+
download_dir = "/var/www/MISP/app/files/feeds"
138+
extract_dir = "/var/www/MISP/app/files/feeds/HaaS"
139+
download_latest_file(base_url, download_dir, extract_dir, consolidated_ips_set, days_delay=1)

0 commit comments

Comments
 (0)