-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathssdr.py
205 lines (159 loc) · 6.53 KB
/
ssdr.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
"""
This script auto-inlines images, Javascript, and CSS.
It also resolves iframes to depth MAX_DEPTH (3 by default).
"""
import base64
import functools
import logging
import re
import requests
import sys
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor
from mitmproxy import http, ctx
from mitmproxy.script import concurrent
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
# Configurable max-depth
# This is a simple way to avoid infinite loops, but it also limits the
# performance improvement to a factor of MAX_DEPTH. This could be solved
# by implementing a cycle-checker.
MAX_DEPTH = 3
# Disable cert validation; needed since our localhost certs are self-signed
requests.get = functools.partial(requests.get, verify=False)
def img_retrieve_source_base64(base_uri: str, old_src: str):
try:
resp = requests.get(old_src)
except requests.exceptions.MissingSchema:
new_uri = f"{base_uri}/{old_src}"
resp = requests.get(new_uri)
try:
resp.raise_for_status()
except requests.HTTPError as e:
# we can't signal an error here, so let the browser handle the 404
logger.info(f"Error in img {old_src}: {e}")
return None
return f"data:{resp.headers['content-type']};base64,{base64.b64encode(resp.content).decode()}"
def script_get_source_string(base_uri: str, old_src: str):
logger.debug(f"script called for {old_src}")
try:
resp = requests.get(old_src)
except requests.exceptions.MissingSchema:
new_uri = f"{base_uri}/{old_src}"
resp = requests.get(new_uri)
try:
resp.raise_for_status()
except requests.exceptions.HTTPError as e:
logger.info(f"HTTPError for {old_src}: {e}")
return None
return resp.text
# Regex matching relative CSS urls; 3rd group contains the relative url
# tested on examples from https://developer.mozilla.org/en-US/docs/Web/CSS/url()
# using https://regex101.com/
CSS_URL_REGEX = re.compile(r"url\(([\'\"]?)(?!(https?://|data:|#))([^\'\"\s\)]*)\1\)")
def absolutize_css_urls(base_uri: str, match: re.Match) -> str:
# CSS URLs are relative to the CSS document, so special absolutizing is needed
logger.debug(f'replacing {match.group(0)} with url({match.group(1)}{base_uri}/{match.group(3)}{match.group(1)})')
return f'url({match.group(1)}{base_uri}/{match.group(3)}{match.group(1)})'
def css_get_source_string(base_uri: str, old_src: str):
logger.debug(f"css called for {old_src}")
uri = old_src
try:
resp = requests.get(old_src)
except requests.exceptions.MissingSchema:
uri = f"{base_uri}/{old_src}"
resp = requests.get(uri)
try:
resp.raise_for_status()
except requests.exceptions.HTTPError as e:
logger.info(f"HTTPError for {old_src}: {e}")
return None
# URLs will be relative to the base_uri here, so we need to make them absolute
uri_dir = uri[:uri.rfind('/')]
result = re.sub(CSS_URL_REGEX, functools.partial(absolutize_css_urls, uri_dir), resp.text)
return result
def iframe_get_source_string(base_uri: str, old_src: str, executor: ThreadPoolExecutor, depth: int):
logger.debug(f"iframe called for {old_src}")
try:
resp = requests.get(old_src)
resp.raise_for_status()
except requests.exceptions.MissingSchema:
new_uri = f"{base_uri}/{old_src}"
resp = requests.get(new_uri)
try:
resp.raise_for_status()
except requests.exceptions.HTTPError as e:
logger.info(f"HTTPError for {old_src}: {e}")
return None
html = BeautifulSoup(resp.content, "html.parser")
logger.debug(f'recursing for {old_src} with depth {depth+1}')
return str(inline_html(html, base_uri, executor, depth=depth+1)) # recursively resolve iframes
def inline_html(html: BeautifulSoup, base_uri: str, executor: ThreadPoolExecutor, depth: int = 0) -> BeautifulSoup:
if not html.body or depth > MAX_DEPTH:
return html
scripts = [script for script in html.findAll('script') if script.get('src')]
images = [img for img in html.findAll('img') if img.get('src')]
styles = [
link for link in html.findAll('link')
if link.get('rel') and
len(link['rel']) > 0 and
link['rel'][0] == 'stylesheet' and
link.get('href')
]
iframes = [iframe for iframe in html.findAll('iframe') if iframe.get('src')]
image_results = [
executor.submit(functools.partial(img_retrieve_source_base64, base_uri), image['src'])
for image in images
]
script_results = [
executor.submit(functools.partial(script_get_source_string, base_uri), script['src'])
for script in scripts
]
style_results = [
executor.submit(functools.partial(css_get_source_string, base_uri), style['href'])
for style in styles
]
iframe_results = [
executor.submit(
functools.partial(iframe_get_source_string, base_uri, executor=executor, depth=depth), iframe['src']
)
for iframe in iframes
]
for (image, result) in zip(images, image_results):
r = result.result()
if r:
image['src'] = r
for (script, result) in zip(scripts, script_results):
r = result.result()
if r:
del script['src']
script.string = r
for (style, result) in zip(styles, style_results):
r = result.result()
if r:
new_style = html.new_tag('style')
new_style.string = r
style.insert_after(new_style)
style.decompose()
for (iframe, result) in zip(iframes, iframe_results):
r = result.result()
if r:
iframe['srcdoc'] = r
return html
class SSDR:
max_workers = 6 # same as Firefox default
@concurrent
def response(self, flow: http.HTTPFlow) -> None:
original_base_uri = f"{flow.request.scheme}://{flow.request.host}"
if flow.request.port not in [80, 443]:
# only add the port if it's nonstandard; this allows the replace to work as expected
original_base_uri += f':{flow.request.port}'
html = BeautifulSoup(flow.response.content, "html.parser")
if html.body:
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
flow.response.text = str(inline_html(html, original_base_uri, executor))
new_base_uri = f"{flow.request.scheme}://{ctx.options.listen_host}:{ctx.options.listen_port}"
flow.replace(original_base_uri, new_base_uri)
addons = [
SSDR()
]