Skip to content

Commit

Permalink
Validating entered data, not silently stripping port numbers or URLs (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
kazet authored Mar 20, 2024
1 parent 03043b8 commit ec4c763
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 38 deletions.
3 changes: 1 addition & 2 deletions artemis/csrf.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ def validate_csrf(func: Any) -> Any:
@wraps(func)
async def wrapper(request: Request, csrf_protect: CsrfProtect, *args: Any, **kwargs: Any) -> Any:
await csrf_protect.validate_csrf(request)
response = await func(request, *args, **kwargs)
csrf_protect.unset_csrf_cookie(response)
response = await func(request, *args, csrf_protect=csrf_protect, **kwargs)
return response

return wrapper
Expand Down
20 changes: 20 additions & 0 deletions artemis/frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from artemis.db import DB, ColumnOrdering, TaskFilter
from artemis.json_utils import JSONEncoderAdditionalTypes
from artemis.karton_utils import restart_crashed_tasks
from artemis.modules.classifier import Classifier
from artemis.producer import create_tasks
from artemis.templating import templates

Expand Down Expand Up @@ -130,6 +131,25 @@ async def post_add(
total_list += (x.strip() for x in targets.split())
if file:
total_list += (x.strip() for x in file.decode().split())

for task in total_list:
if not Classifier.is_supported(task):
binds = sorted(get_binds_that_can_be_disabled(), key=lambda bind: bind.identity.lower())

return csrf.csrf_form_template_response(
"add.jinja2",
{
"validation_message": f"{task} is not supported - Artemis supports domains, IPs or IP ranges",
"request": request,
"binds": binds,
"tasks": total_list,
"tag": tag or "",
"disabled_modules": disabled_modules,
"modules_disabled_by_default": Config.Miscellaneous.MODULES_DISABLED_BY_DEFAULT,
},
csrf_protect,
)

create_tasks(total_list, tag, disabled_modules)
if redirect:
return RedirectResponse("/", status_code=301)
Expand Down
39 changes: 16 additions & 23 deletions artemis/modules/classifier.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#!/usr/bin/env python3
import ipaddress
import urllib.parse
from typing import List, Optional

from karton.core import Task
Expand Down Expand Up @@ -30,33 +29,27 @@ class Classifier(ArtemisBase):
def _sanitize(data: str) -> str:
data = data.lower()

if "hxxp://" in data or "hxxps://" in data or "[.]" in data:
raise RuntimeError(
"Defanged URL detected. If you really want to scan it, please provide it as a standard one."
)

# strip URL schemes
if "://" in data:
hostname = urllib.parse.urlparse(data).hostname
assert hostname is not None
data = hostname
# Strip leading and trailing dots (e.g. domain.com.)
data = data.strip(".")

# strip after slash
if "/" in data:
data = data.split("/")[0]
return data

# if contains '@', then split after
if "@" in data:
data = data.split("@")[1]
@staticmethod
def is_supported(data: str) -> bool:
if Classifier._to_ip_range(data):
return True

# split last ":" (port)
if ":" in data:
data = data.rsplit(":")[0]
try:
# if this doesn't throw then we have an IP address
ipaddress.ip_address(data)
return True
except ValueError:
pass

# Strip leading and trailing dots (e.g. domain.com.)
data = data.strip(".")
if is_domain(data):
return True

return data
return False

@staticmethod
def _classify(data: str) -> TaskType:
Expand Down
18 changes: 15 additions & 3 deletions templates/add.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,26 @@

<h1>Add targets</h1>
<form action="/add" method="post" class="w-100">
{% if validation_message %}
<div class="alert alert-danger" role="alert">
{{ validation_message }}
</div>
{% endif %}

<input type="hidden" name="csrf_token" value="{{ csrf_token }}" />

<div class="form-group mb-3">
<label class="form-label">Targets (separated with newlines)</label>
<textarea class="form-control" name="targets"></textarea>
<textarea class="form-control" name="targets">{% for task in tasks %}{{ task }}
{% endfor %}</textarea>
</div>
<div class="form-group mb-3">
<label class="form-label">Batch file (should contain one target per line)</label>
<input class="form-control btn" type="file" name="file">
</div>
<div class="form-group mb-3">
<label class="form-label">Tag</label>
<input type="text" class="form-control" name="tag" />
<input type="text" class="form-control" name="tag" value="{{ tag }}">
<small class="form-text text-muted">
You may provide any string here - it will be saved in the task results
in the database so that you can e.g. use the value when processing
Expand Down Expand Up @@ -57,7 +64,12 @@
{% for bind in binds %}
<div class="form-check col-md-4">
<label class="form-check-label">
<input class="form-check-input" type="checkbox" value="" name="module_enabled_{{ bind.identity }}" {% if bind.identity not in modules_disabled_by_default %}checked{% endif %}>
<input class="form-check-input" type="checkbox" value="" name="module_enabled_{{ bind.identity }}"
{% if disabled_modules %}
{% if bind.identity not in disabled_modules %}checked{% endif %}
{% else %}
{% if bind.identity not in modules_disabled_by_default %}checked{% endif %}
{% endif %}>
{{ bind.identity }}<br/>
<span class="small text-muted">{{ bind.info|dedent|render_markdown|safe }}</span>
</label>
Expand Down
33 changes: 23 additions & 10 deletions test/modules/test_classifer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,31 @@ def test_skipping_public_suffixes(self) -> None:

self.assertTasksEqual(results, [])

def test_support(self) -> None:
self.assertTrue(Classifier.is_supported("cert.pl"))
self.assertTrue(Classifier.is_supported("CERT.pl"))
self.assertFalse(Classifier.is_supported("https://CERT.pl"))
self.assertFalse(Classifier.is_supported("http://cert.pl"))
self.assertFalse(Classifier.is_supported("cert.pl:8080"))
self.assertFalse(Classifier.is_supported("ws://cert.pl"))
self.assertFalse(Classifier.is_supported("[email protected]"))
self.assertFalse(Classifier.is_supported("ssh://127.0.0.1"))
self.assertFalse(Classifier.is_supported("127.0.0.1:8080"))

def test_parsing(self) -> None:
urls = [
TestData("https://CERT.pl", [ExpectedTaskData(TaskType.DOMAIN, "cert.pl")]),
TestData("https://cert.pl", [ExpectedTaskData(TaskType.DOMAIN, "cert.pl")]),
TestData("http://cert.pl", [ExpectedTaskData(TaskType.DOMAIN, "cert.pl")]),
TestData("cert.pl", [ExpectedTaskData(TaskType.DOMAIN, "cert.pl")]),
TestData("cert.pl:8080", [ExpectedTaskData(TaskType.DOMAIN, "cert.pl")]),
TestData("ws://cert.pl", [ExpectedTaskData(TaskType.DOMAIN, "cert.pl")]),
TestData("[email protected]", [ExpectedTaskData(TaskType.DOMAIN, "cert.pl")]),
TestData("ssh://cert.pl", [ExpectedTaskData(TaskType.DOMAIN, "cert.pl")]),
TestData("ssh://127.0.0.1", [ExpectedTaskData(TaskType.IP, "127.0.0.1")]),
TestData("127.0.0.1:8080", [ExpectedTaskData(TaskType.IP, "127.0.0.1")]),
TestData(
"cert.pl",
[
ExpectedTaskData(TaskType.DOMAIN, "cert.pl"),
],
),
TestData(
"CERT.pl",
[
ExpectedTaskData(TaskType.DOMAIN, "cert.pl"),
],
),
TestData(
"127.0.0.1-127.0.0.5",
[
Expand Down

0 comments on commit ec4c763

Please sign in to comment.