-
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathjson2haproxy.py
222 lines (181 loc) · 8.52 KB
/
json2haproxy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
import os
import json
import re
import logging
from pathlib import Path
from typing import List, Dict, Optional, Tuple
from functools import lru_cache
# --- Configuration ---
LOG_LEVEL = logging.INFO # Adjust as needed (DEBUG, INFO, WARNING, ERROR)
OUTPUT_DIR = Path(os.getenv("OUTPUT_DIR", "waf_patterns/haproxy/"))
INPUT_FILE = Path(os.getenv("INPUT_FILE", "owasp_rules.json"))
UNSUPPORTED_PATTERNS = [
"@pmFromFile", "@detectSQLi", "@validateByteRange", "@detectXSS", # Core unsupported
# Add any other unsupported patterns discovered during testing
]
# Operator Mapping: ModSecurity -> HAProxy
OPERATOR_MAP = {
# String Comparisons
"@streq": "str -m str",
"@endsWith": "str -m end",
"@contains": "str -m sub",
"!@eq": "str -m !str", # Negated string equality
"!@within": "str -m !reg", # Negated regex (approximate)
# Integer Comparisons (These are handled separately)
"@lt": "<",
"@ge": ">=",
"@gt": ">",
"@eq": "==",
# IP address matching
"@ipMatch": "src_ip",
}
# --- Logging Setup ---
logging.basicConfig(level=LOG_LEVEL, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
# --- Utility Functions ---
@lru_cache(maxsize=None) # Cache regex compilation for performance
def validate_regex(pattern: str) -> bool:
"""Validates a regex pattern and checks for excessive complexity."""
try:
if pattern.count(".*") > 5: # Basic complexity check
logger.warning(f"Regex potentially too complex: {pattern}")
# Optionally return False here to *reject* complex regexes
re.compile(pattern)
return True
except re.error as e:
logger.warning(f"Invalid regex: {pattern} - {e}")
return False
def load_owasp_rules(file_path: Path) -> List[Dict]:
"""Loads OWASP rules from the JSON file."""
try:
with open(file_path, "r") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError, Exception) as e:
logger.error(f"Error loading rules from {file_path}: {e}")
raise # Re-raise to prevent the script from continuing
def _sanitize_regex_pattern(pattern: str) -> str:
"""Helper function to clean up regex patterns."""
pattern = pattern.replace("@rx ", "").strip()
pattern = re.sub(r"\(\?i\)", "", pattern) # Remove (?i)
pattern = pattern.replace("$", r"\$") # $ -> \$
pattern = re.sub(r"&l(?:brace|cub);?", r"{", pattern) # {
pattern = re.sub(r"&r(?:brace|cub);?", r"}", pattern) # }
pattern = re.sub(r"\\\.\*", r"\.*", pattern) # Remove unnecessary escapes
pattern = re.sub(r"(?<!\\)\.(?![\w])", r"\.", pattern) # Escape .
pattern = re.sub(r"\(\?:", "(", pattern) # (?: -> (
return pattern
def sanitize_pattern(pattern: str, location: str) -> Tuple[Optional[str], str]:
"""
Sanitizes and converts a ModSecurity pattern to its HAProxy equivalent.
Returns: (sanitized_pattern, acl_type) or (None, "") if unsupported.
"""
original_pattern = pattern # Keep for logging
# 1. Handle ModSecurity operators *first*.
for modsec_op, haproxy_op in OPERATOR_MAP.items():
if pattern.startswith(modsec_op):
if haproxy_op in ("<", ">=", ">", "=="): # Integer comparisons
# Integer comparisons are handled *separately*
return pattern.replace(modsec_op, haproxy_op).strip(), "int"
else: # String comparisons
return pattern.replace(modsec_op, haproxy_op).strip(), "hdr_sub"
# 2. Check for unsupported patterns *after* operator handling.
for directive in UNSUPPORTED_PATTERNS:
if directive in pattern:
logger.warning(f"Skipping unsupported pattern (contains {directive}): {original_pattern}")
return None, ""
# 3. Handle regular expressions (@rx)
if "@rx" in pattern:
return _sanitize_regex_pattern(pattern), "hdr_reg"
# 4. If no operator and no @rx, assume it's a simple string match
return pattern, "hdr_sub"
def generate_haproxy_conf(rules: List[Dict]) -> None:
"""Generates the HAProxy WAF configuration (waf.acl)."""
try:
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
config_file = OUTPUT_DIR / "waf.acl"
acl_rules: Dict[str, List[str]] = {} # { location: [acl_rules] }
int_rules: List[str] = []
deny_high: List[str] = []
log_medium: List[str] = []
tarpit_low: List[str] = []
# Process each OWASP rule
for rule in rules:
rule_id = rule.get("id", "no_id")
category = rule.get("category", "uncategorized").lower()
location = rule.get("location", "User-Agent").lower() #important! lowercase
pattern = rule["pattern"]
severity = rule.get("severity", "medium").lower()
sanitized_pattern, acl_type = sanitize_pattern(pattern, location)
if sanitized_pattern is None: # Unsupported/invalid pattern
continue
if acl_type == "int": # Int comparison
action = "deny" if severity == "high" else "log" if severity == "medium" else "tarpit"
# Special cases: some locations cannot be used directly
if location in ("query-string", "request-uri"):
int_rules.append(f"http-request {action} if {{ {location} {sanitized_pattern} }}")
else:
int_rules.append(f"http-request {action} if {{ {location},{sanitized_pattern} }}")
elif acl_type in ("hdr_reg", "hdr_sub"): # String comparison
acl_name = f"block_{category}_{rule_id}"
# Build the ACL rule string
if location == "request-uri":
acl_string = f"acl {acl_name} path_reg -i {sanitized_pattern}"
elif location == "query-string":
# No direct query_reg in HAProxy. Need to use path, url, or url_param
acl_string = f"acl {acl_name} url_param_reg -i {sanitized_pattern}"
elif location in ("host", "content-type", "referer","user-agent"):
hdr_func = "hdr_reg" if acl_type == "hdr_reg" else "hdr_sub"
acl_string = f"acl {acl_name} {hdr_func}({location.replace('-','')}) -i {sanitized_pattern}"
else:
logger.warning(f"Unsupported location: {location} for rule: {rule_id}")
continue # Skip unsupported locations
if location not in acl_rules:
acl_rules[location] = []
acl_rules[location].append(acl_string)
if severity == "high":
deny_high.append(acl_name)
elif severity == "medium":
log_medium.append(acl_name)
elif severity == "low":
tarpit_low.append(acl_name)
# Write the configuration to the file
with open(config_file, "w") as f:
f.write("# HAProxy WAF ACL rules\n\n")
# Integer Comparison Rules (if any)
if int_rules:
f.write("# Integer Comparison Rules\n")
for rule in int_rules:
f.write(f"{rule}\n")
f.write("\n")
# ACL Rules (by location)
for location, rules in acl_rules.items():
f.write(f"# Rules for {location.title()}\n") # title()
for rule in rules:
f.write(f"{rule}\n")
f.write("\n")
# Deny/Action Logic
f.write("# Deny/Action Logic\n")
if deny_high:
f.write(f"http-request deny if {' or '.join(deny_high)}\n")
if log_medium:
f.write(f"http-request log if {' or '.join(log_medium)}\n")
if tarpit_low:
f.write(f"http-request tarpit if {' or '.join(tarpit_low)}\n")
logger.info(f"HAProxy WAF configuration generated at: {config_file}")
except Exception as e:
logger.error(f"Error generating HAProxy configuration: {e}")
raise
def main() -> None:
"""Main function."""
try:
logger.info("Loading OWASP rules...")
owasp_rules = load_owasp_rules(INPUT_FILE)
logger.info(f"Loaded {len(owasp_rules)} rules.")
logger.info("Generating HAProxy WAF configuration...")
generate_haproxy_conf(owasp_rules)
logger.info("HAProxy WAF generation complete.")
except Exception as e:
logger.critical(f"Script failed: {e}")
exit(1) # Exit with an error code
if __name__ == "__main__":
main()