diff --git a/changelogs/fragments/643-extend-hidden-fields.yaml b/changelogs/fragments/643-extend-hidden-fields.yaml new file mode 100644 index 0000000000..94e206e6cb --- /dev/null +++ b/changelogs/fragments/643-extend-hidden-fields.yaml @@ -0,0 +1,3 @@ +--- +minor_changes: +- k8s, k8s_info Extend hidden_fields to allow the expression of more complex field types to be hidden (https://github.com/ansible-collections/kubernetes.core/pull/643) diff --git a/plugins/module_utils/k8s/service.py b/plugins/module_utils/k8s/service.py index 342ee44149..31c76bc868 100644 --- a/plugins/module_utils/k8s/service.py +++ b/plugins/module_utils/k8s/service.py @@ -4,7 +4,7 @@ import copy from json import loads from re import compile -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple, Union from ansible.module_utils.common.dict_transformations import dict_merge from ansible_collections.kubernetes.core.plugins.module_utils.hashes import ( @@ -501,47 +501,107 @@ def diff_objects( result["before"] = diff[0] result["after"] = diff[1] - if list(result["after"].keys()) != ["metadata"] or list( + if list(result["after"].keys()) == ["metadata"] and list( result["before"].keys() - ) != ["metadata"]: - return False, result + ) == ["metadata"]: + # If only metadata.generation and metadata.resourceVersion changed, ignore it + ignored_keys = set(["generation", "resourceVersion"]) - # If only metadata.generation and metadata.resourceVersion changed, ignore it - ignored_keys = set(["generation", "resourceVersion"]) - - if not set(result["after"]["metadata"].keys()).issubset(ignored_keys): - return False, result - if not set(result["before"]["metadata"].keys()).issubset(ignored_keys): - return False, result + if set(result["after"]["metadata"].keys()).issubset(ignored_keys) and set( + result["before"]["metadata"].keys() + ).issubset(ignored_keys): + return True, result result["before"] = hide_fields(result["before"], hidden_fields) result["after"] = hide_fields(result["after"], hidden_fields) - return True, result + return False, result -def hide_fields(definition: dict, hidden_fields: Optional[list]) -> dict: - if not hidden_fields: - return definition - result = copy.deepcopy(definition) - for hidden_field in hidden_fields: - result = hide_field(result, hidden_field) +def hide_field_tree(hidden_field: str) -> List[str]: + result = [] + key, rest = hide_field_split2(hidden_field) + result.append(key) + while rest: + key, rest = hide_field_split2(rest) + result.append(key) + return result -# hide_field is not hugely sophisticated and designed to cope -# with e.g. status or metadata.managedFields rather than e.g. -# spec.template.spec.containers[0].env[3].value -def hide_field(definition: dict, hidden_field: str) -> dict: - split = hidden_field.split(".", 1) - if split[0] in definition: - if len(split) == 2: - definition[split[0]] = hide_field(definition[split[0]], split[1]) - else: - del definition[split[0]] +def build_hidden_field_tree(hidden_fields: List[str]) -> Dict[str, Any]: + """Group hidden field targeting the same json key + Example: + Input: ['env[3]', 'env[0]'] + Output: {'env': [0, 3]} + """ + output = {} + for hidden_field in hidden_fields: + current = output + tree = hide_field_tree(hidden_field) + for idx, key in enumerate(tree): + if current.get(key, "") is None: + break + if idx == (len(tree) - 1): + current[key] = None + elif key not in current: + current[key] = {} + current = current[key] + return output + + +# hide_field should be able to cope with simple or more complicated +# field definitions +# e.g. status or metadata.managedFields or +# spec.template.spec.containers[0].env[3].value or +# metadata.annotations[kubectl.kubernetes.io/last-applied-configuration] +def hide_field( + definition: Union[Dict[str, Any], List[Any]], hidden_field: Dict[str, Any] +) -> Dict[str, Any]: + def dict_contains_key(obj: Dict[str, Any], key: str) -> bool: + return key in obj + + def list_contains_key(obj: List[Any], key: str) -> bool: + return int(key) < len(obj) + + hidden_keys = list(hidden_field.keys()) + field_contains_key = dict_contains_key + field_get_key = str + if isinstance(definition, list): + # Sort with reverse=true so that when we delete an item from the list, the order is not changed + hidden_keys = sorted( + [k for k in hidden_field.keys() if k.isdecimal()], reverse=True + ) + field_contains_key = list_contains_key + field_get_key = int + + for key in hidden_keys: + if field_contains_key(definition, key): + value = hidden_field.get(key) + convert_key = field_get_key(key) + if value is None: + del definition[convert_key] + else: + definition[convert_key] = hide_field(definition[convert_key], value) + if ( + definition[convert_key] == dict() + or definition[convert_key] == list() + ): + del definition[convert_key] + return definition +def hide_fields( + definition: Dict[str, Any], hidden_fields: Optional[List[str]] +) -> Dict[str, Any]: + if not hidden_fields: + return definition + result = copy.deepcopy(definition) + hidden_field_tree = build_hidden_field_tree(hidden_fields) + return hide_field(result, hidden_field_tree) + + def decode_response(resp) -> Tuple[Dict, List[str]]: """ This function decodes unserialized responses from the Kubernetes python @@ -620,3 +680,35 @@ def parse_quoted_string(quoted_string: str) -> Tuple[str, str]: raise ValueError("invalid quoted string: missing closing quote") return "".join(result), remainder + + +# hide_field_split2 returns the first key in hidden_field and the rest of the hidden_field +# We expect the first key to either be in brackets, to be terminated by the start of a left +# bracket, or to be terminated by a dot. + +# examples would be: +# field.another.next -> (field, another.next) +# field[key].value -> (field, [key].value) +# [key].value -> (key, value) +# [one][two] -> (one, [two]) + + +def hide_field_split2(hidden_field: str) -> Tuple[str, str]: + lbracket = hidden_field.find("[") + rbracket = hidden_field.find("]") + dot = hidden_field.find(".") + + if lbracket == 0: + # skip past right bracket and any following dot + rest = hidden_field[rbracket + 1 :] # noqa: E203 + if rest and rest[0] == ".": + rest = rest[1:] + return (hidden_field[lbracket + 1 : rbracket], rest) # noqa: E203 + + if lbracket != -1 and (dot == -1 or lbracket < dot): + return (hidden_field[:lbracket], hidden_field[lbracket:]) + + split = hidden_field.split(".", 1) + if len(split) == 1: + return split[0], "" + return split diff --git a/plugins/modules/k8s.py b/plugins/modules/k8s.py index ea7440cfe2..a0a28542bd 100644 --- a/plugins/modules/k8s.py +++ b/plugins/modules/k8s.py @@ -188,7 +188,8 @@ description: - Hide fields matching this option in the result - An example might be C(hidden_fields=[metadata.managedFields]) - - Only field definitions that don't reference list items are supported (so V(spec.containers[0]) would not work) + or C(hidden_fields=[spec.containers[0].env[3].value]) + or C(hidden_fields=[metadata.annotations[kubectl.kubernetes.io/last-applied-configuration]]) type: list elements: str version_added: 3.0.0 diff --git a/plugins/modules/k8s_info.py b/plugins/modules/k8s_info.py index e315fe0944..cb822531eb 100644 --- a/plugins/modules/k8s_info.py +++ b/plugins/modules/k8s_info.py @@ -48,7 +48,8 @@ description: - Hide fields matching any of the field definitions in the result - An example might be C(hidden_fields=[metadata.managedFields]) - - Only field definitions that don't reference list items are supported (so V(spec.containers[0]) would not work) + or C(hidden_fields=[spec.containers[0].env[3].value]) + or C(hidden_fields=[metadata.annotations[kubectl.kubernetes.io/last-applied-configuration]]) type: list elements: str version_added: 3.0.0 diff --git a/tests/integration/targets/k8s_hide_fields/tasks/main.yml b/tests/integration/targets/k8s_hide_fields/tasks/main.yml index 4b361fb96b..f54fe9eb6a 100644 --- a/tests/integration/targets/k8s_hide_fields/tasks/main.yml +++ b/tests/integration/targets/k8s_hide_fields/tasks/main.yml @@ -77,6 +77,7 @@ definition: "{{ hide_fields_base_configmap | combine({'data':{'anew':'value'}}) }}" hidden_fields: - data + - metadata.annotations[kubectl.kubernetes.io/last-applied-configuration] apply: true register: hf6 diff: true @@ -86,6 +87,22 @@ that: - hf6.changed + - name: Ensure hidden fields are not present + assert: + that: + - >- + 'annotations' not in hf6.result.metadata or + 'kubectl.kubernetes.io/last-applied-configuration' + not in hf6.result.metadata.annotations + - >- + 'annotations' not in hf6.diff.before.metadata or + 'kubectl.kubernetes.io/last-applied-configuration' + not in hf6.diff.before.metadata.annotations + - >- + 'annotations' not in hf6.diff.after.metadata or + 'kubectl.kubernetes.io/last-applied-configuration' + not in hf6.diff.after.metadata.annotations + - name: Hidden field should not show up in deletion k8s: definition: "{{ hide_fields_base_configmap}}" diff --git a/tests/unit/module_utils/test_hide_fields.py b/tests/unit/module_utils/test_hide_fields.py new file mode 100644 index 0000000000..6377c97d99 --- /dev/null +++ b/tests/unit/module_utils/test_hide_fields.py @@ -0,0 +1,253 @@ +import pytest +from ansible_collections.kubernetes.core.plugins.module_utils.k8s.service import ( + build_hidden_field_tree, + hide_fields, +) + + +def test_hiding_missing_field_does_nothing(): + output = dict( + kind="ConfigMap", metadata=dict(name="foo"), data=dict(one="1", two="2") + ) + hidden_fields = ["doesnotexist"] + assert hide_fields(output, hidden_fields) == output + + +def test_hiding_simple_field(): + output = dict( + kind="ConfigMap", metadata=dict(name="foo"), data=dict(one="1", two="2") + ) + hidden_fields = ["metadata"] + expected = dict(kind="ConfigMap", data=dict(one="1", two="2")) + assert hide_fields(output, hidden_fields) == expected + + +def test_hiding_only_key_in_dict_removes_dict(): + output = dict(kind="ConfigMap", metadata=dict(name="foo"), data=dict(one="1")) + hidden_fields = ["data.one"] + expected = dict(kind="ConfigMap", metadata=dict(name="foo")) + assert hide_fields(output, hidden_fields) == expected + + +def test_hiding_all_keys_in_dict_removes_dict(): + output = dict( + kind="ConfigMap", metadata=dict(name="foo"), data=dict(one="1", two="2") + ) + hidden_fields = ["data.one", "data.two"] + expected = dict(kind="ConfigMap", metadata=dict(name="foo")) + assert hide_fields(output, hidden_fields) == expected + + +def test_hiding_multiple_fields(): + output = dict( + kind="ConfigMap", metadata=dict(name="foo"), data=dict(one="1", two="2") + ) + hidden_fields = ["metadata", "data.one"] + expected = dict(kind="ConfigMap", data=dict(two="2")) + assert hide_fields(output, hidden_fields) == expected + + +def test_hiding_dict_key(): + output = dict( + kind="ConfigMap", + metadata=dict( + name="foo", + annotations={ + "kubectl.kubernetes.io/last-applied-configuration": '{"testvalue"}' + }, + ), + data=dict(one="1", two="2"), + ) + hidden_fields = [ + "metadata.annotations[kubectl.kubernetes.io/last-applied-configuration]", + ] + expected = dict( + kind="ConfigMap", metadata=dict(name="foo"), data=dict(one="1", two="2") + ) + assert hide_fields(output, hidden_fields) == expected + + +def test_hiding_list_value_key(): + output = dict( + kind="Pod", + metadata=dict(name="foo"), + spec=dict( + containers=[ + dict( + name="containers", + image="busybox", + env=[ + dict(name="ENV1", value="env1"), + dict(name="ENV2", value="env2"), + dict(name="ENV3", value="env3"), + ], + ) + ] + ), + ) + hidden_fields = ["spec.containers[0].env[1].value"] + expected = dict( + kind="Pod", + metadata=dict(name="foo"), + spec=dict( + containers=[ + dict( + name="containers", + image="busybox", + env=[ + dict(name="ENV1", value="env1"), + dict(name="ENV2"), + dict(name="ENV3", value="env3"), + ], + ) + ] + ), + ) + assert hide_fields(output, hidden_fields) == expected + + +def test_hiding_last_list_item(): + output = dict( + kind="Pod", + metadata=dict(name="foo"), + spec=dict( + containers=[ + dict( + name="containers", + image="busybox", + env=[ + dict(name="ENV1", value="env1"), + ], + ) + ] + ), + ) + hidden_fields = ["spec.containers[0].env[0]"] + expected = dict( + kind="Pod", + metadata=dict(name="foo"), + spec=dict( + containers=[ + dict( + name="containers", + image="busybox", + ) + ] + ), + ) + assert hide_fields(output, hidden_fields) == expected + + +def test_hiding_nested_dicts_using_brackets(): + output = dict( + kind="Pod", + metadata=dict(name="foo"), + spec=dict( + containers=[ + dict( + name="containers", + image="busybox", + securityContext=dict(runAsUser=101), + ) + ] + ), + ) + hidden_fields = ["spec.containers[0][securityContext][runAsUser]"] + expected = dict( + kind="Pod", + metadata=dict(name="foo"), + spec=dict( + containers=[ + dict( + name="containers", + image="busybox", + ) + ] + ), + ) + if hide_fields(output, hidden_fields) != expected: + print(output) + print(expected) + assert hide_fields(output, hidden_fields) == expected + + +def test_using_jinja_syntax(): + output = dict( + kind="ConfigMap", metadata=dict(name="foo"), data=["0", "1", "2", "3"] + ) + hidden_fields = ["data.2"] + expected = dict(kind="ConfigMap", metadata=dict(name="foo"), data=["0", "1", "3"]) + assert hide_fields(output, hidden_fields) == expected + + +def test_remove_multiple_items_from_list(): + output = dict( + kind="ConfigMap", metadata=dict(name="foo"), data=["0", "1", "2", "3"] + ) + hidden_fields = ["data[0]", "data[2]"] + expected = dict(kind="ConfigMap", metadata=dict(name="foo"), data=["1", "3"]) + assert hide_fields(output, hidden_fields) == expected + + +def test_hide_dict_and_nested_dict(): + output = { + "kind": "Pod", + "metadata": { + "labels": { + "control-plane": "controller-manager", + "pod-template-hash": "687b856498", + }, + "annotations": { + "kubectl.kubernetes.io/default-container": "awx-manager", + "creationTimestamp": "2025-01-16T12:40:43Z", + }, + }, + } + hidden_fields = ["metadata.labels.pod-template-hash", "metadata.labels"] + expected = { + "kind": "Pod", + "metadata": { + "annotations": { + "kubectl.kubernetes.io/default-container": "awx-manager", + "creationTimestamp": "2025-01-16T12:40:43Z", + } + }, + } + assert hide_fields(output, hidden_fields) == expected + + +@pytest.mark.parametrize( + "hidden_fields,expected", + [ + ( + [ + "data[0]", + "data[1]", + "metadata.annotation", + "metadata.annotation[0].name", + ], + {"data": {"0": None, "1": None}, "metadata": {"annotation": None}}, + ), + ( + [ + "data[0]", + "data[1]", + "metadata.annotation[0].name", + "metadata.annotation", + ], + {"data": {"0": None, "1": None}, "metadata": {"annotation": None}}, + ), + ( + [ + "data[0]", + "data[1]", + "data", + "metadata.annotation[0].name", + "metadata.annotation", + ], + {"data": None, "metadata": {"annotation": None}}, + ), + ], +) +def test_build_hidden_field_tree(hidden_fields, expected): + assert build_hidden_field_tree(hidden_fields) == expected