Skip to content

Commit

Permalink
[AAP-37476]: Validate DE Urls (ansible#1188)
Browse files Browse the repository at this point in the history
Co-authored-by: Alex <[email protected]>
  • Loading branch information
zkayyali812 and Alex-Izquierdo authored Jan 17, 2025
1 parent ce08de5 commit b5d1035
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 36 deletions.
5 changes: 2 additions & 3 deletions src/aap_eda/api/serializers/decision_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,8 @@ class DecisionEnvironmentCreateSerializer(serializers.ModelSerializer):

def validate(self, data):
eda_credential_id = data.get("eda_credential_id")
if eda_credential_id:
image_url = data.get("image_url") or self.instance.image_url
validators.check_if_de_valid(image_url, eda_credential_id)
image_url = data.get("image_url") or self.instance.image_url
validators.check_if_de_valid(image_url, eda_credential_id)

return data

Expand Down
76 changes: 43 additions & 33 deletions src/aap_eda/core/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ def check_if_de_exists(decision_environment_id: int) -> int:
return decision_environment_id


def check_if_de_valid(image_url: str, eda_credential_id: int):
def check_if_de_valid(
image_url: str, eda_credential_id: tp.Optional[int] = None
):
# The OCI standard format for the image url is a combination of a host
# (with optional port) separated from the image path (with optional tag) by
# a slash: <host>[:port]/<path>[:tag].
Expand Down Expand Up @@ -102,8 +104,14 @@ def check_if_de_valid(image_url: str, eda_credential_id: int):
% {"image_url": image_url}
)

split = path.split(":", 1)
name = split[0]
digest = False
if "@sha256" in path or "@sha512" in path:
split = path.split("@", 1)
name = split[0]
digest = True
else:
split = path.split(":", 1)
name = split[0]
# Get the tag sans any additional content. Any additional content
# is passed without validation.
tag = split[1] if (len(split) > 1) else None
Expand All @@ -121,7 +129,7 @@ def check_if_de_valid(image_url: str, eda_credential_id: int):
% {"image_url": image_url, "name": name}
)

if (tag is not None) and (
if (not digest and tag is not None) and (
not re.fullmatch(r"[a-zA-Z0-9_][a-zA-Z0-9._-]{0,127}", tag)
):
raise serializers.ValidationError(
Expand All @@ -132,38 +140,40 @@ def check_if_de_valid(image_url: str, eda_credential_id: int):
% {"image_url": image_url, "tag": tag}
)

credential = get_credential_if_exists(eda_credential_id)
inputs = yaml.safe_load(credential.inputs.get_secret_value())
credential_host = inputs.get("host")
if eda_credential_id:
credential = get_credential_if_exists(eda_credential_id)
inputs = yaml.safe_load(credential.inputs.get_secret_value())
credential_host = inputs.get("host")

if not credential_host:
raise serializers.ValidationError(
_("Credential %(name)s needs to have host information")
% {"name": credential.name}
)

# Check that the host matches the credential host.
# For backward compatibility when creating a new DE with an old credential
# we need to separate any scheme from the host before doing the compare.
parsed_credential_host = urllib.parse.urlparse(credential_host)
# If there's a netloc that's the host to use; if not, it's the path if
# there is no scheme else it's the scheme and path joined by a colon.
if parsed_credential_host.netloc:
parsed_host = parsed_credential_host.netloc
else:
parsed_host = parsed_credential_host.path
if parsed_credential_host.scheme:
parsed_host = ":".join(
[parsed_credential_host.scheme, parsed_host]
if not credential_host:
raise serializers.ValidationError(
_("Credential %(name)s needs to have host information")
% {"name": credential.name}
)

if host != parsed_host:
msg = _(
"DecisionEnvironment image url: %(image_url)s does "
"not match with the credential host: %(host)s"
) % {"image_url": image_url, "host": credential_host}

raise serializers.ValidationError(msg)
# Check that the host matches the credential host.
# For backward compatibility when creating a new DE with
# an old credential we need to separate any
# scheme from the host before doing the compare.
parsed_credential_host = urllib.parse.urlparse(credential_host)
# If there's a netloc that's the host to use; if not, it's the path if
# there is no scheme else it's the scheme and path joined by a colon.
if parsed_credential_host.netloc:
parsed_host = parsed_credential_host.netloc
else:
parsed_host = parsed_credential_host.path
if parsed_credential_host.scheme:
parsed_host = ":".join(
[parsed_credential_host.scheme, parsed_host]
)

if host != parsed_host:
msg = _(
"DecisionEnvironment image url: %(image_url)s does "
"not match with the credential host: %(host)s"
) % {"image_url": image_url, "host": credential_host}

raise serializers.ValidationError(msg)


def get_credential_if_exists(eda_credential_id: int) -> models.EdaCredential:
Expand Down
142 changes: 142 additions & 0 deletions tests/integration/api/test_decision_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,148 @@ def test_create_decision_environment_with_empty_credential(
assert status_message in str(errors)


de_requests = [
(
True,
"1.2.3.4/group/img1",
"",
),
(
True,
"registry.com/group/img1",
"",
),
(
True,
"registry.com/group/img1:latest",
"",
),
(
True,
"registry.com/group/img1@sha256:6e8985d6c50cf2eb577f17237ef9c05baa9c2f472a730f13784728cec1fdfab1", # noqa: E501
"",
),
(
True,
"registry.com/group/img1@sha512:6e8985d6c50cf2eb577f17237ef9c05baa9c2f472a730f13784728cec1fdfab1", # noqa: E501
"",
),
(
False,
"https://registry.com/group/img1:latest",
"",
),
(
False,
"registry.com/",
"no image path found",
),
(
False,
"registry.com/group/:tag",
"'group/' does not match OCI name standard",
),
(
False,
"registry.com/group/img1:",
"'' does not match OCI tag standard",
),
(
False,
"registry.com/group/bad^img1",
"'group/bad^img1' does not match OCI name standard",
),
(
False,
"registry.com/group/img1:bad^tag",
"'bad^tag' does not match OCI tag standard",
),
(
False,
"registry.com:5000/group/img1:bad^tag@additional-content",
"'bad^tag' does not match OCI tag standard",
),
]


@pytest.mark.parametrize(
("expected_success", "image_url", "unallowed"),
de_requests,
)
@pytest.mark.django_db
def test_create_decision_environment_with_no_credential(
expected_success,
image_url,
unallowed,
default_organization: models.Organization,
admin_client: APIClient,
preseed_credential_types,
):
data_in = {
"name": "de1",
"description": "desc here",
"image_url": image_url,
"organization_id": default_organization.id,
}
response = admin_client.post(
f"{api_url_v1}/decision-environments/", data=data_in
)
return_code = status.HTTP_400_BAD_REQUEST
if expected_success:
return_code = status.HTTP_201_CREATED

assert response.status_code == return_code
if return_code == status.HTTP_400_BAD_REQUEST:
errors = response.data.get("non_field_errors")
assert f"Image url {image_url} is malformed; {unallowed}" in str(
errors
)


@pytest.mark.parametrize(
("expected_success", "image_url", "unallowed"),
de_requests,
)
@pytest.mark.django_db
def test_patch_decision_environment_with_no_credential(
expected_success,
image_url,
unallowed,
default_organization: models.Organization,
admin_client: APIClient,
preseed_credential_types,
):
data_in = {
"name": "de1",
"description": "desc here",
"image_url": "quay.io/test/test:latest",
"organization_id": default_organization.id,
}
response = admin_client.post(
f"{api_url_v1}/decision-environments/", data=data_in
)
assert response.status_code == status.HTTP_201_CREATED
data_in = {
"name": "de1",
"description": "desc here",
"image_url": image_url,
"organization_id": default_organization.id,
}
response = admin_client.patch(
f"{api_url_v1}/decision-environments/" f"{response.data['id']}/",
data=data_in,
)
return_code = status.HTTP_400_BAD_REQUEST
if expected_success:
return_code = status.HTTP_200_OK
assert response.status_code == return_code
if return_code == status.HTTP_400_BAD_REQUEST:
errors = response.data.get("non_field_errors")
assert f"Image url {image_url} is malformed; {unallowed}" in str(
errors
)


@pytest.mark.django_db
def test_create_decision_environment_with_none_organization(
admin_client: APIClient,
Expand Down

0 comments on commit b5d1035

Please sign in to comment.