Skip to content

Commit

Permalink
fixing some corner case for hide_fields method
Browse files Browse the repository at this point in the history
  • Loading branch information
abikouo committed Feb 13, 2025
1 parent 9aa518b commit ac4781d
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 28 deletions.
92 changes: 66 additions & 26 deletions plugins/module_utils/k8s/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import copy
from json import loads
from re import compile
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple, Union

from ansible.module_utils.common.dict_transformations import dict_merge
from ansible_collections.kubernetes.core.plugins.module_utils.hashes import (
Expand Down Expand Up @@ -518,45 +518,85 @@ def diff_objects(
return False, result


def hide_fields(definition: dict, hidden_fields: Optional[list]) -> dict:
if not hidden_fields:
return definition
result = copy.deepcopy(definition)
for hidden_field in hidden_fields:
result = hide_field(result, hidden_field)
def hide_field_tree(hidden_field: str) -> List[str]:
result = []
key, rest = hide_field_split2(hidden_field)
result.append(key)
while rest:
key, rest = hide_field_split2(rest)
result.append(key)

return result


def build_hidden_field_tree(hidden_fields: List[str]) -> Dict[str, Any]:
output = {}
for hidden_field in hidden_fields:
current = output
tree = hide_field_tree(hidden_field)
for idx, key in enumerate(tree):
if current.get(key, "") is None:
break
if idx == (len(tree) - 1):
current[key] = None
elif key not in current:
current[key] = {}
current = current[key]
return output


# hide_field should be able to cope with simple or more complicated
# field definitions
# e.g. status or metadata.managedFields or
# spec.template.spec.containers[0].env[3].value or
# metadata.annotations[kubectl.kubernetes.io/last-applied-configuration]
def hide_field(definition: dict, hidden_field: str) -> dict:
def dict_contains_key(field: dict, key: str) -> bool:
return key in field
def hide_field(
definition: Union[Dict[str, Any], List[Any]], hidden_field: Dict[str, Any]
) -> Dict[str, Any]:
def dict_contains_key(obj: Dict[str, Any], key: str) -> bool:
return key in obj

def list_contains_key(field: list, key: str) -> bool:
return key < len(field)
def list_contains_key(obj: List[Any], key: str) -> bool:
return int(key) < len(obj)

hidden_keys = list(hidden_field.keys())
field_contains_key = dict_contains_key

(key, rest) = hide_field_split2(hidden_field)

if key.isdecimal():
key = int(key)
field_get_key = str
if isinstance(definition, list):
# Sort with reverse=true so that when we delete an item from the list, the order is not changed
hidden_keys = sorted(
[k for k in hidden_field.keys() if k.isdecimal()], reverse=True
)
field_contains_key = list_contains_key
if field_contains_key(definition, key):
if rest:
definition[key] = hide_field(definition[key], rest)
# remove empty dicts and lists from the result
if definition[key] == dict() or definition[key] == list():
del definition[key]
else:
del definition[key]
field_get_key = int

for key in hidden_keys:
if field_contains_key(definition, key):
value = hidden_field.get(key)
convert_key = field_get_key(key)
if value is None:
del definition[convert_key]
else:
definition[convert_key] = hide_field(definition[convert_key], value)
if (
definition[convert_key] == dict()
or definition[convert_key] == list()
):
del definition[convert_key]

return definition


def hide_fields(
definition: Dict[str, Any], hidden_fields: Optional[List[str]]
) -> Dict[str, Any]:
if not hidden_fields:
return definition
result = copy.deepcopy(definition)
hidden_field_tree = build_hidden_field_tree(hidden_fields)
return hide_field(result, hidden_field_tree)


def decode_response(resp) -> Tuple[Dict, List[str]]:
"""
This function decodes unserialized responses from the Kubernetes python
Expand Down Expand Up @@ -648,7 +688,7 @@ def parse_quoted_string(quoted_string: str) -> Tuple[str, str]:
# [one][two] -> (one, [two])


def hide_field_split2(hidden_field: str) -> (str, str):
def hide_field_split2(hidden_field: str) -> Tuple[str, str]:
lbracket = hidden_field.find("[")
rbracket = hidden_field.find("]")
dot = hidden_field.find(".")
Expand Down
86 changes: 84 additions & 2 deletions tests/unit/module_utils/test_hide_fields.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest
from ansible_collections.kubernetes.core.plugins.module_utils.k8s.service import (
hide_fields,
)
build_hidden_field_tree, hide_fields)


def test_hiding_missing_field_does_nothing():
Expand Down Expand Up @@ -167,3 +167,85 @@ def test_hiding_nested_dicts_using_brackets():
print(output)
print(expected)
assert hide_fields(output, hidden_fields) == expected


def test_using_jinja_syntax():
output = dict(
kind="ConfigMap", metadata=dict(name="foo"), data=["0", "1", "2", "3"]
)
hidden_fields = ["data.2"]
expected = dict(kind="ConfigMap", metadata=dict(name="foo"), data=["0", "1", "3"])
assert hide_fields(output, hidden_fields) == expected


def test_remove_multiple_items_from_list():
output = dict(
kind="ConfigMap", metadata=dict(name="foo"), data=["0", "1", "2", "3"]
)
hidden_fields = ["data[0]", "data[2]"]
expected = dict(kind="ConfigMap", metadata=dict(name="foo"), data=["1", "3"])
assert hide_fields(output, hidden_fields) == expected


def test_hide_dict_and_nested_dict():
output = {
"kind": "Pod",
"metadata": {
"labels": {
"control-plane": "controller-manager",
"pod-template-hash": "687b856498",
},
"annotations": {
"kubectl.kubernetes.io/default-container": "awx-manager",
"creationTimestamp": "2025-01-16T12:40:43Z",
},
},
}
hidden_fields = ["metadata.labels.pod-template-hash", "metadata.labels"]
expected = {
"kind": "Pod",
"metadata": {
"annotations": {
"kubectl.kubernetes.io/default-container": "awx-manager",
"creationTimestamp": "2025-01-16T12:40:43Z",
}
},
}
assert hide_fields(output, hidden_fields) == expected


@pytest.mark.parametrize(
"hidden_fields,expected",
[
(
[
"data[0]",
"data[1]",
"metadata.annotation",
"metadata.annotation[0].name",
],
{"data": {"0": None, "1": None}, "metadata": {"annotation": None}},
),
(
[
"data[0]",
"data[1]",
"metadata.annotation[0].name",
"metadata.annotation",
],
{"data": {"0": None, "1": None}, "metadata": {"annotation": None}},
),
(
[
"data[0]",
"data[1]",
"data",
"metadata.annotation[0].name",
"metadata.annotation",
],
{"data": None, "metadata": {"annotation": None}},
),
],
)
def test_build_hidden_field_tree(hidden_fields, expected):
assert build_hidden_field_tree(hidden_fields) == expected

0 comments on commit ac4781d

Please sign in to comment.