-
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathjson2nginx.py
223 lines (188 loc) · 9.52 KB
/
json2nginx.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
import json
import os
import re
import logging
from pathlib import Path
from collections import defaultdict
from functools import lru_cache
from typing import List, Dict, Optional, Tuple
# --- Configuration ---
LOG_LEVEL = logging.INFO # DEBUG, INFO, WARNING, ERROR
INPUT_FILE = Path(os.getenv("INPUT_FILE", "owasp_rules.json"))
OUTPUT_DIR = Path(os.getenv("OUTPUT_DIR", "waf_patterns/nginx"))
MAPS_FILE = OUTPUT_DIR / "waf_maps.conf"
RULES_FILE = OUTPUT_DIR / "waf_rules.conf"
# Unsupported Nginx directives (expand as needed)
UNSUPPORTED_PATTERNS = [
"@pmFromFile", # No direct file lookups in Nginx map
]
# --- Logging Setup ---
logging.basicConfig(level=LOG_LEVEL, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
# --- Utility Functions ---
def load_owasp_rules(file_path: Path) -> List[Dict]:
"""Loads OWASP rules from a JSON file."""
try:
with open(file_path, "r", encoding="utf-8") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError, OSError) as e:
logger.error(f"Error loading rules from {file_path}: {e}")
raise # Re-raise to prevent continuing
@lru_cache(maxsize=256) # Increased cache size
def validate_regex(pattern: str) -> bool:
"""Validates a regex pattern (basic check)."""
try:
re.compile(pattern)
return True
except re.error as e:
logger.warning(f"Invalid regex: {pattern} - {e}")
return False
def _sanitize_pattern(pattern: str) -> str:
"""Internal helper to clean and escape patterns for Nginx."""
pattern = pattern.replace("@rx ", "").strip() # Remove ModSecurity @rx
# Remove case-insensitive flag (?i) as Nginx uses ~* for that
pattern = re.sub(r"\(\?i\)", "", pattern)
# Convert $ to \$
pattern = pattern.replace("$", r"\$")
# Convert { or { to {
pattern = re.sub(r"&l(?:brace|cub);?", r"{", pattern)
pattern = re.sub(r"&r(?:brace|cub);?", r"}", pattern)
# Remove unnecessary \.*
pattern = re.sub(r"\\\.\*", r"\.*", pattern)
pattern = re.sub(r"(?<!\\)\.(?![\w])", r"\.", pattern) # Escape dots
# Replace non-capturing groups (?:...) with capturing groups (...)
pattern = re.sub(r"\(\?:", "(", pattern)
return pattern
def sanitize_pattern(pattern: str, location: str) -> Optional[str]:
"""
Sanitizes a pattern for use in an Nginx map directive.
Returns the sanitized pattern, or None if the pattern is unsupported.
"""
# Skip unsupported patterns.
for unsupported in UNSUPPORTED_PATTERNS:
if unsupported in pattern:
logger.warning(f"Skipping unsupported pattern: {pattern}")
return None
# Sanitize the pattern
pattern = _sanitize_pattern(pattern)
# Escape special characters for Nginx map (most importantly, the ~)
# We use re.escape, but *selectively* unescape key regex metacharacters.
pattern = re.escape(pattern)
# Unescape: \. \( \) \[ \] \| \? \* \+ \{ \} \^ \$ \\
pattern = re.sub(r'\\([.()[\]|?*+{}^$\\])', r'\1', pattern)
return pattern
def generate_nginx_waf(rules: List[Dict]) -> None:
"""Generates Nginx WAF configuration (maps and rules)."""
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
categorized_rules: Dict[str, Dict[str, str]] = defaultdict(lambda: defaultdict(str)) # category -> location
for rule in rules:
rule_id = rule.get("id", "no_id") # Get rule ID
category = rule.get("category", "generic").lower()
location = rule.get("location", "request-uri").lower() # set a default location
pattern = rule["pattern"]
severity = rule.get("severity", "medium").lower() # get severity
sanitized_pattern = sanitize_pattern(pattern, location)
if not sanitized_pattern or not validate_regex(sanitized_pattern):
continue # Skip invalid or unsupported patterns
if location == "request-uri":
variable = "$request_uri"
elif location == "query-string":
variable = "$args" # Use $args for query string
elif location == "user-agent":
variable = "$http_user_agent"
elif location == "host":
variable = "$http_host"
elif location == "referer":
variable = "$http_referer"
elif location == "content-type":
variable = "$http_content_type"
# Add more location mappings here
else:
logger.warning(f"Unsupported location: {location} for rule: {rule_id}")
continue
# Add rule based on severity and location
categorized_rules[category][variable] += f' "~*{sanitized_pattern}" {severity};\n' # set severity as value
# --- Generate Maps (waf_maps.conf) ---
try:
with open(MAPS_FILE, "w", encoding="utf-8") as f:
f.write("# Nginx WAF Maps (Generated by json2nginx.py)\n\n")
f.write("http {\n") # Maps *must* be in the http context
for category, location_rules in categorized_rules.items():
# Create the map with the high priority
f.write(f" map $1 $waf_{category} {{\n") # dynamic variable
f.write(' default "";\n') # default value empty
for location, rules in location_rules.items():
f.write(f" # Rules for {location}\n")
f.write(rules) # Write the collected rules for this location
f.write("\n")
f.write(" }\n\n")
f.write("}\n") # Close the http block
logger.info(f"Generated Nginx map file: {MAPS_FILE}")
except IOError as e:
logger.error(f"Error writing to {MAPS_FILE}: {e}")
raise
# --- Generate Rules (waf_rules.conf) ---
try:
with open(RULES_FILE, "w", encoding="utf-8") as f:
f.write("# Nginx WAF Rules (Generated by json2nginx.py)\n\n")
f.write("# Include this file in your 'server' or 'location' block.\n\n")
# iterate for each rule
for category, location_rules in categorized_rules.items():
# set map to correct WAF block
map_variable = f"$waf_{category}"
# create conditions based on priority
f.write(f' if ({map_variable} = "high") {{\n return 403;\n }}\n')
f.write(f' if ({map_variable} = "medium") {{\n add_header X-WAF-Blocked "medium-{category}";\n }}\n') # example for another action
f.write(f' if ({map_variable} = "low") {{\n add_header X-WAF-Blocked "low-{category}";\n }}\n\n') # expample for other action
logger.info(f"Generated Nginx rules file: {RULES_FILE}")
except IOError as e:
logger.error(f"Error writing to {RULES_FILE}: {e}")
raise
# --- Generate README ---
readme_file = OUTPUT_DIR / "README.md"
try:
with open(readme_file, "w", encoding="utf-8") as f:
f.write("# Nginx WAF Configuration\n\n")
f.write("This directory contains Nginx WAF configuration files generated from OWASP rules.\n\n")
f.write("## Usage\n\n")
f.write("1. **Include `waf_maps.conf` in your `http` block:**\n")
f.write(" ```nginx\n")
f.write(" http {\n")
f.write(" include /path/to/waf_patterns/nginx/waf_maps.conf;\n")
f.write(" # ... other http configurations ...\n")
f.write(" }\n")
f.write(" ```\n\n")
f.write("2. **Include `waf_rules.conf` in your `server` or `location` block:**\n")
f.write(" ```nginx\n")
f.write(" server {\n")
f.write(" # ... other server configurations ...\n")
f.write(" include /path/to/waf_patterns/nginx/waf_rules.conf;\n")
f.write(" }\n")
f.write(" ```\n\n")
f.write("3. **Reload Nginx:**\n")
f.write(" ```bash\n")
f.write(" sudo nginx -t && sudo systemctl reload nginx\n")
f.write(" ```\n\n")
f.write("## Important Notes:\n\n")
f.write("* **Testing is Crucial:** Thoroughly test your WAF configuration with a variety of requests (both legitimate and malicious) to ensure it's working correctly and not causing false positives.\n")
f.write("* **False Positives:** WAF rules, especially those based on regex, can sometimes block legitimate traffic. Monitor your Nginx logs and adjust the rules as needed.\n")
f.write("* **Performance:** Complex regexes can impact performance. Use the simplest regex that accurately matches the threat.\n")
f.write("* **Updates:** Regularly update the OWASP rules (by re-running `owasp2json.py` and `json2nginx.py`) to stay protected against new threats.\n")
f.write("* **This is not a complete WAF:** This script provides a basic WAF based on pattern matching. For more comprehensive protection, consider using a dedicated WAF solution like Nginx App Protect or ModSecurity.\n")
except IOError as e:
logger.error(f"Error writing to {readme_file}: {e}")
raise
def main():
"""Main function."""
try:
logger.info("Loading OWASP rules...")
owasp_rules = load_owasp_rules(INPUT_FILE)
logger.info(f"Loaded {len(owasp_rules)} rules.")
logger.info("Generating Nginx WAF configuration...")
generate_nginx_waf(owasp_rules)
logger.info("Nginx WAF generation complete.")
except Exception as e:
logger.critical(f"Script failed: {e}")
exit(1) # Exit with an error code
if __name__ == "__main__":
main()