From 84ac2f343aac24c12c67ad7f61a0768321a43048 Mon Sep 17 00:00:00 2001 From: Enrico Deusebio Date: Wed, 17 Jan 2024 02:13:24 +0100 Subject: [PATCH 1/2] [DPE-3343] spark8t support for incluster use --- spark8t/cli/params.py | 10 +- spark8t/cli/service_account_registry.py | 4 +- spark8t/domain.py | 16 +- spark8t/services.py | 246 +++++++------------- tests/integration/conftest.py | 2 +- tests/integration/test_registry.py | 7 +- tests/unittest/test_domain.py | 17 ++ tests/unittest/test_services.py | 284 ++++++++++-------------- 8 files changed, 241 insertions(+), 345 deletions(-) diff --git a/spark8t/cli/params.py b/spark8t/cli/params.py index b33c317..7180930 100644 --- a/spark8t/cli/params.py +++ b/spark8t/cli/params.py @@ -128,12 +128,10 @@ def add_deploy_arguments(parser: ArgumentParser) -> ArgumentParser: def get_kube_interface(args: Namespace) -> AbstractKubeInterface: - return ( - LightKube(args.kubeconfig or defaults.kube_config, defaults) - if args.backend == "lightkube" - else KubeInterface( - args.kubeconfig or defaults.kube_config, context_name=args.context - ) + _class = LightKube if args.backend == "lightkube" else KubeInterface + + return _class( + args.kubeconfig or defaults.kube_config, defaults, context_name=args.context ) diff --git a/spark8t/cli/service_account_registry.py b/spark8t/cli/service_account_registry.py index 70aa159..1a5eda7 100644 --- a/spark8t/cli/service_account_registry.py +++ b/spark8t/cli/service_account_registry.py @@ -106,7 +106,9 @@ def main(args: Namespace, logger: Logger): logger.debug(f"Using K8s context: {context}") - registry = K8sServiceAccountRegistry(kube_interface.with_context(context)) + registry = K8sServiceAccountRegistry( + kube_interface.with_context(context) if context else kube_interface + ) if args.action == Actions.CREATE: service_account = build_service_account_from_args(args, registry) diff --git a/spark8t/domain.py b/spark8t/domain.py index 8efef7a..18ad18c 100644 --- a/spark8t/domain.py +++ b/spark8t/domain.py @@ -3,7 +3,7 @@ import re from dataclasses import dataclass from enum import Enum -from typing import Any, Callable, Dict, List, Optional, Tuple +from typing import Any, Callable, Dict, List, Optional, Tuple, Union from spark8t.utils import WithLogging, union @@ -211,6 +211,13 @@ def spark_home(self): def spark_confs(self): return self.environ.get("SPARK_CONFS", os.path.join(self.spark_home, "conf")) + @property + def kubernetes_api(self): + return ( + f"https://{self.environ['KUBERNETES_SERVICE_HOST']}:" + + f"{self.environ['KUBERNETES_SERVICE_PORT']}" + ) + @property def spark_user_data(self): return self.environ["SPARK_USER_DATA"] @@ -221,9 +228,10 @@ def kubectl_cmd(self) -> str: return self.environ.get("SPARK_KUBECTL", "kubectl") @property - def kube_config(self) -> str: - """Return default kubeconfig to use if not explicitly provided.""" - return self.environ["KUBECONFIG"] + def kube_config(self) -> Union[None, str]: + """Return default kubeconfig to use if provided in env variable.""" + filename = self.environ.get("KUBECONFIG", None) + return filename if filename else None @property def static_conf_file(self) -> str: diff --git a/spark8t/services.py b/spark8t/services.py index 9de5988..3d20302 100644 --- a/spark8t/services.py +++ b/spark8t/services.py @@ -12,7 +12,7 @@ from typing import Any, Dict, List, Optional, Type, Union import yaml -from lightkube import Client, KubeConfig, codecs +from lightkube import Client, KubeConfig, SingleConfig, codecs from lightkube.core.exceptions import ApiError from lightkube.core.resource import GlobalResource from lightkube.models.meta_v1 import ObjectMeta @@ -49,72 +49,71 @@ class AbstractKubeInterface(WithLogging, metaclass=ABCMeta): """Abstract class for implementing Kubernetes Interface.""" - @abstractmethod - def with_context(self, context_name: str): + defaults: Defaults + kube_config_file: Union[None, str, Dict[str, Any]] + context_name: Optional[str] = None + + def __init__( + self, + kube_config_file: Union[None, str, Dict[str, Any]], + defaults: Defaults, + context_name: Optional[str] = None, + ): + """Initialise a KubeInterface class from a kube config file. + + Args: + kube_config_file: kube config path + context_name: name of the context to be used + """ + self.kube_config_file = kube_config_file + self.defaults = defaults + self._context_name = context_name + + def with_context(self, context_name: str) -> "AbstractKubeInterface": """Return a new KubeInterface object using a different context. Args: context_name: context to be used """ - pass - - @property - @abstractmethod - def kube_config_file(self) -> Union[str, Dict[str, Any]]: - pass + return type(self)(self.kube_config_file, self.defaults, context_name) @cached_property - def kube_config(self) -> Dict[str, Any]: + def kube_config(self) -> KubeConfig: """Return the kube config file parsed as a dictionary""" + if not self.kube_config_file: + return KubeConfig.from_env() + if isinstance(self.kube_config_file, str): - with open(self.kube_config_file, "r") as fid: - return yaml.safe_load(fid) + return KubeConfig.from_file(self.kube_config_file) + elif isinstance(self.kube_config_file, dict): + return KubeConfig.from_dict(self.kube_config_file) else: - return self.kube_config_file + raise ValueError( + f"malformed kube_config: type {type(self.kube_config_file)}" + ) @cached_property - def available_contexts(self) -> List[str]: - """Return the available contexts present in the kube config file.""" - return [context["name"] for context in self.kube_config["contexts"]] - - @property - @abstractmethod def context_name(self) -> str: - """Return current context name.""" - pass + return self._context_name or self.kube_config.current_context @cached_property - def context(self) -> Dict[str, str]: - """Return current context.""" - return [ - context["context"] - for context in self.kube_config["contexts"] - if context["name"] == self.context_name - ][0] - - @cached_property - def cluster(self) -> Dict: - """Return current cluster.""" - return [ - cluster["cluster"] - for cluster in self.kube_config["clusters"] - if cluster["name"] == self.context["cluster"] - ][0] + def single_config(self) -> SingleConfig: + return self.kube_config.get(self.context_name) @cached_property def api_server(self): """Return current K8s api-server endpoint.""" - return self.cluster["server"] + return self.single_config.cluster.server @cached_property def namespace(self): """Return current namespace.""" - return self.context.get("namespace", "default") + return self.single_config.context.namespace @cached_property def user(self): """Return current admin user.""" - return self.context.get("user", "default") + return self.single_config.context.user @abstractmethod def get_service_account( @@ -241,16 +240,15 @@ def exists( def select_by_master(self, master: str): api_servers_clusters = { - cluster["name"]: cluster["cluster"]["server"] - for cluster in self.kube_config["clusters"] + name: cluster.server for name, cluster in self.kube_config.clusters.items() } self.logger.debug(f"Clusters API: {dict(api_servers_clusters)}") contexts_for_api_server = [ - _context["name"] - for _context in self.kube_config["contexts"] - if api_servers_clusters[_context["context"]["cluster"]] == master + name + for name, context in self.kube_config.contexts.items() + if api_servers_clusters[context.cluster] == master ] if len(contexts_for_api_server) == 0: @@ -279,33 +277,9 @@ class LightKube(AbstractKubeInterface): } ) - def __init__( - self, - kube_config_file: Union[str, Dict[str, Any]], - defaults: Defaults, - context_name: Optional[str] = None, - ): - """Initialise a KubeInterface class from a kube config file. - - Args: - kube_config_file: kube config path - context_name: name of the context to be used - """ - self._kube_config_file = kube_config_file - self._context_name = context_name - self.config = KubeConfig.from_file(self.kube_config_file) - - self.defaults = defaults - - if context_name: - self.client = Client(config=self.config.get(context_name=context_name)) - else: - self.client = Client(config=self.config.get()) - - @property - def kube_config_file(self) -> Union[str, Dict[str, Any]]: - """Return the kube config file name""" - return self._kube_config_file + @cached_property + def client(self): + return Client(config=self.single_config) def with_context(self, context_name: str): """Return a new KubeInterface object using a different context. @@ -315,15 +289,6 @@ def with_context(self, context_name: str): """ return LightKube(self.kube_config_file, self.defaults, context_name) - @property - def context_name(self) -> str: - """Return current context name.""" - return ( - self.kube_config["current-context"] - if self._context_name is None - else self._context_name - ) - def get_service_account( self, account_id: str, namespace: Optional[str] = None ) -> Dict[str, Any]: @@ -670,36 +635,9 @@ def exists( class KubeInterface(AbstractKubeInterface): """Class for providing an interface for k8s API needed for the spark client.""" - def __init__( - self, - kube_config_file: Union[str, Dict[str, Any]], - context_name: Optional[str] = None, - kubectl_cmd: str = "kubectl", - ): - """Initialise a KubeInterface class from a kube config file. - - Args: - kube_config_file: kube config path - context_name: name of the context to be used - kubectl_cmd: path to the kubectl command to be used to interact with the K8s API - """ - self._kube_config_file = kube_config_file - self._context_name = context_name - self.kubectl_cmd = kubectl_cmd - - @property - def kube_config_file(self) -> Union[str, Dict[str, Any]]: - """Return the kube config file name""" - return self._kube_config_file - - @property - def context_name(self) -> str: - """Return current context name.""" - return ( - self.kube_config["current-context"] - if self._context_name is None - else self._context_name - ) + @cached_property + def kubectl_cmd(self): + return self.defaults.kubectl_cmd def with_context(self, context_name: str): """Return a new KubeInterface object using a different context. @@ -707,15 +645,7 @@ def with_context(self, context_name: str): Args: context_name: context to be used """ - return KubeInterface(self.kube_config_file, context_name, self.kubectl_cmd) - - def with_kubectl_cmd(self, kubectl_cmd: str): - """Return a new KubeInterface object using a different kubectl command. - - Args: - kubectl_cmd: path to the kubectl command to be used - """ - return KubeInterface(self.kube_config_file, self.context_name, kubectl_cmd) + return KubeInterface(self.kube_config_file, self.defaults, context_name) def exec( self, @@ -740,14 +670,18 @@ def exec( Output of the command, either parsed as yaml or string """ - base_cmd = f"{self.kubectl_cmd} --kubeconfig {self.kube_config_file} " + cmd_list = [self.kubectl_cmd] + if self.kube_config_file: + cmd_list += [f"--kubeconfig {self.kube_config_file}"] if namespace and "--namespace" not in cmd or "-n" not in cmd: - base_cmd += f" --namespace {namespace} " - if "--context" not in cmd: - base_cmd += f" --context {context or self.context_name} " + cmd_list += [f"--namespace {namespace}"] + if self.kube_config_file and "--context" not in cmd: + cmd_list += [f"--context {context or self.context_name}"] - base_cmd += f"{cmd} -o {output or 'yaml'} " + cmd_list += [cmd, f"-o {output or 'yaml'}"] + + base_cmd = " ".join(cmd_list) self.logger.debug(f"Executing command: {base_cmd}") @@ -766,10 +700,10 @@ def get_service_account( namespace: namespace where to look for the service account. Default is 'default' """ - cmd = f"get serviceaccount {account_id} -n {namespace}" + cmd = f"get serviceaccount {account_id}" try: - service_account_raw = self.exec(cmd, namespace=self.namespace) + service_account_raw = self.exec(cmd, namespace=namespace) except subprocess.CalledProcessError as e: if "NotFound" in e.stdout.decode("utf-8"): raise K8sResourceNotFound( @@ -942,51 +876,24 @@ def exists( @classmethod def autodetect( - cls, context_name: Optional[str] = None, kubectl_cmd: str = "kubectl" + cls, context_name: Optional[str] = None, defaults: Defaults = Defaults() ) -> "KubeInterface": """ Return a KubeInterface object by auto-parsing the output of the kubectl command. Args: context_name: context to be used to export the cluster configuration - kubectl_cmd: path to the kubectl command to be used to interact with the K8s API + defaults: defaults coming from env variable """ - cmd = kubectl_cmd + cmd = defaults.kubectl_cmd if context_name: cmd += f" --context {context_name}" - config = parse_yaml_shell_output(f"{cmd} config view --minify -o yaml") - - return KubeInterface(config, context_name=context_name, kubectl_cmd=kubectl_cmd) - - def select_by_master(self, master: str): - api_servers_clusters = { - cluster["name"]: cluster["cluster"]["server"] - for cluster in self.kube_config["clusters"] - } - - self.logger.debug(f"Clusters API: {dict(api_servers_clusters)}") - - contexts_for_api_server = [ - _context["name"] - for _context in self.kube_config["contexts"] - if api_servers_clusters[_context["context"]["cluster"]] == master - ] - - if len(contexts_for_api_server) == 0: - raise AccountNotFound(master) - - self.logger.info( - f"Contexts on api server {master}: {', '.join(contexts_for_api_server)}" - ) + config = parse_yaml_shell_output(f"{cmd} config view --raw --minify -o yaml") - return ( - self - if self.context_name in contexts_for_api_server - else self.with_context(contexts_for_api_server[0]) - ) + return KubeInterface(config, defaults=defaults, context_name=context_name) class AbstractServiceAccountRegistry(WithLogging, ABC): @@ -1582,7 +1489,12 @@ def spark_submit( submit_cmd = f"{self.defaults.spark_submit} {' '.join(submit_args)}" self.logger.debug(submit_cmd) - with environ(KUBECONFIG=self.kube_interface.kube_config_file): + + envs = {} + if self.kube_interface.kube_config_file: + envs["KUBECONFIG"] = self.kube_interface.kube_config_file + + with environ(**envs): os.system(submit_cmd) def spark_shell( @@ -1633,7 +1545,12 @@ def spark_shell( submit_cmd = f"{self.defaults.spark_shell} {' '.join(submit_args)}" self.logger.debug(submit_cmd) - with environ(KUBECONFIG=self.kube_interface.kube_config_file): + + envs = {} + if self.kube_interface.kube_config_file: + envs["KUBECONFIG"] = self.kube_interface.kube_config_file + + with environ(**envs): os.system(f"touch {self.defaults.scala_history_file}") os.system(submit_cmd) @@ -1680,7 +1597,12 @@ def pyspark_shell( submit_cmd = f"{self.defaults.pyspark} {' '.join(submit_args)}" self.logger.debug(submit_cmd) - with environ(KUBECONFIG=self.kube_interface.kube_config_file): + + envs = {} + if self.kube_interface.kube_config_file: + envs["KUBECONFIG"] = self.kube_interface.kube_config_file + + with environ(**envs): os.system(submit_cmd) def prefix_optional_detected_driver_host(self, conf: PropertyFile): diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index fd87ea7..56791a8 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -34,7 +34,7 @@ def _get_kube_namespaces(interface): @pytest.fixture def kubeinterface(defs_with_kubeconf): - interface = KubeInterface(defs_with_kubeconf.kube_config) + interface = KubeInterface(defs_with_kubeconf.kube_config, defs_with_kubeconf) ns_before = _get_kube_namespaces(interface) yield interface ns_after = _get_kube_namespaces(interface) diff --git a/tests/integration/test_registry.py b/tests/integration/test_registry.py index c7c1c5c..570fec4 100644 --- a/tests/integration/test_registry.py +++ b/tests/integration/test_registry.py @@ -174,12 +174,11 @@ def test_merge_configurations(): @pytest.mark.usefixtures("integration_test") def test_kube_interface(kubeinterface): context = str(uuid.uuid4()) - kubectl_cmd = str(uuid.uuid4()) k = kubeinterface.autodetect() assert k.context_name == "microk8s" - assert k.cluster.get("server") == "https://127.0.0.1:16443" - k2 = k.select_by_master("https://127.0.0.1:16443") + # assert (k.single_config.cluster.server == "https://127.0.0.1:16443") + + k2 = k.select_by_master(k.single_config.cluster.server) assert k2.context_name == "microk8s" assert k.with_context(context).context_name == context - assert k.with_kubectl_cmd(kubectl_cmd).kubectl_cmd == kubectl_cmd diff --git a/tests/unittest/test_domain.py b/tests/unittest/test_domain.py index efd9451..68b0521 100644 --- a/tests/unittest/test_domain.py +++ b/tests/unittest/test_domain.py @@ -1,4 +1,5 @@ import logging +import os import tempfile import uuid @@ -38,6 +39,22 @@ def test_defaults(): assert defaults.pyspark, f"{spark_home}/bin/pyspark" +def test_defaults_kube_config(): + """ + Validates defaults passed in as environment. + """ + + from spark8t.utils import environ + + d = Defaults(dict()) + + assert d.kube_config is None + + with environ(KUBECONFIG="my-kube-config"): + d = Defaults(dict(os.environ)) + assert d.kube_config == "my-kube-config" + + def test_service_account(): """ Validates service account including defending namespace and account name against overrides. diff --git a/tests/unittest/test_services.py b/tests/unittest/test_services.py index 9fe47ee..4be298f 100644 --- a/tests/unittest/test_services.py +++ b/tests/unittest/test_services.py @@ -187,7 +187,6 @@ def test_kube_interface(): username3 = str(uuid.uuid4()) context3 = str(uuid.uuid4()) token3 = str(uuid.uuid4()) - test_kubectl_cmd = str(uuid.uuid4()) kubeconfig_yaml = { "apiVersion": "v1", @@ -247,26 +246,26 @@ def test_kube_interface(): ], } - k = KubeInterface(kube_config_file=kubeconfig_yaml) + k = KubeInterface(kube_config_file=kubeconfig_yaml, defaults=defaults) assert k.context_name == context2 assert k.with_context(context3).context_name == context3 - assert k.with_context(context3).context.get("cluster") == f"{context3}-cluster" - assert k.with_kubectl_cmd(test_kubectl_cmd).kubectl_cmd == test_kubectl_cmd - assert k.kube_config == kubeconfig_yaml + assert ( + k.with_context(context3).single_config.context.cluster == f"{context3}-cluster" + ) - assert context1 in k.available_contexts - assert context2 in k.available_contexts - assert context3 in k.available_contexts - assert len(k.available_contexts) == 3 + assert context1 in k.kube_config.contexts + assert context2 in k.kube_config.contexts + assert context3 in k.kube_config.contexts + assert len(k.kube_config.contexts) == 3 - current_context = k.context - assert current_context.get("cluster") == f"{context2}-cluster" - assert current_context.get("user") == f"{username2}" + current_context = k.single_config.context + assert current_context.cluster == f"{context2}-cluster" + assert current_context.user == f"{username2}" - current_cluster = k.cluster - assert current_cluster.get("certificate-authority-data") == f"{test_id}-2" - assert current_cluster.get("server") == f"https://0.0.0.0:{test_id}-2" + current_cluster = k.single_config.cluster + assert current_cluster.certificate_auth_data == f"{test_id}-2" + assert current_cluster.server == f"https://0.0.0.0:{test_id}-2" def test_lightkube(tmp_kubeconf): @@ -283,19 +282,21 @@ def test_lightkube(tmp_kubeconf): assert k.context_name == context2 assert k.with_context(context3).context_name == context3 - assert k.with_context(context3).context.get("cluster") == f"{context3}-cluster" + assert ( + k.with_context(context3).single_config.context.cluster == f"{context3}-cluster" + ) - assert context1 in k.available_contexts - assert context2 in k.available_contexts - assert context3 in k.available_contexts - assert len(k.available_contexts) == 3 + assert context1 in k.kube_config.contexts + assert context2 in k.kube_config.contexts + assert context3 in k.kube_config.contexts + assert len(k.kube_config.contexts) == 3 - current_context = k.context - assert current_context.get("cluster") == f"{context2}-cluster" - assert current_context.get("user") == f"{username2}" + current_context = k.single_config.context + assert current_context.cluster == f"{context2}-cluster" + assert current_context.user == f"{username2}" - current_cluster = k.cluster - assert current_cluster.get("server") == "https://0.0.0.1:9090" + current_cluster = k.single_config.cluster + assert current_cluster.server == "https://0.0.0.1:9090" def test_lightkube_get_secret(mocker, tmp_kubeconf): @@ -325,9 +326,7 @@ def side_effect(*args, **kwargs): assert conf_value == secret_result["data"][conf_key] -def test_kube_interface_get_secret(mocker): - mock_yaml_safe_load = mocker.patch("yaml.safe_load") - mock_open = mocker.patch("builtins.open") +def test_kube_interface_get_secret(mocker, tmp_path): mock_subprocess = mocker.patch("subprocess.check_output") # mock logic @@ -370,9 +369,12 @@ def side_effect(*args, **kwargs): "users": [{"name": f"{username}", "user": {"token": f"{token}"}}], } - kubeconfig_yaml_str = yaml.dump(kubeconfig_yaml, sort_keys=False) + kube_config_file = os.path.join(tmp_path, kubeconfig) + + with open(kube_config_file, "w") as fid: + yaml.dump(kubeconfig_yaml, fid, sort_keys=False) - cmd_get_secret = f"kubectl --kubeconfig {kubeconfig} --namespace {namespace} --context {context} get secret {secret_name} --ignore-not-found -o yaml " + cmd_get_secret = f"kubectl --kubeconfig {kube_config_file} --namespace {namespace} --context {context} get secret {secret_name} --ignore-not-found -o yaml" output_get_secret_yaml = { "apiVersion": "v1", "data": {conf_key: conf_value_base64_encoded}, @@ -393,12 +395,11 @@ def side_effect(*args, **kwargs): cmd_get_secret: output_get_secret, } - mock_yaml_safe_load.side_effect = [kubeconfig_yaml, output_get_secret_yaml] - - with patch("builtins.open", mock_open(read_data=kubeconfig_yaml_str)): - k = KubeInterface(kube_config_file=kubeconfig) - secret_result = k.get_secret(secret_name, namespace) - assert conf_value == secret_result["data"][conf_key] + secret_mock = mocker.patch("spark8t.utils.parse_yaml_shell_output") + secret_mock.side_effect = output_get_secret + k = KubeInterface(kube_config_file=kube_config_file, defaults=defaults) + secret_result = k.get_secret(secret_name, namespace) + assert conf_value == secret_result["data"][conf_key] mock_subprocess.assert_any_call( cmd_get_secret, shell=True, stderr=subprocess.STDOUT @@ -546,13 +547,10 @@ def test_lightkube_remove_label_role_binding(mocker, tmp_kubeconf): ) -def test_kube_interface_set_label(mocker): - mock_yaml_safe_load = mocker.patch("yaml.safe_load") - mock_open = mocker.patch("builtins.open") +def test_kube_interface_set_label(mocker, tmp_path): mock_subprocess = mocker.patch("subprocess.check_output") # mock logic - def side_effect(*args, **kwargs): return values[args[0]] @@ -591,20 +589,22 @@ def side_effect(*args, **kwargs): "users": [{"name": f"{username}", "user": {"token": f"{token}"}}], } - kubeconfig_yaml_str = yaml.dump(kubeconfig_yaml, sort_keys=False) + kube_config_file = os.path.join(tmp_path, kubeconfig) + + with open(kube_config_file, "w") as fid: + yaml.dump(kubeconfig_yaml, fid, sort_keys=False) + + cmd_set_label = f"kubectl --kubeconfig {kube_config_file} --namespace {namespace} --context {context} label {resource_type} {resource_name} {label} -o yaml" - cmd_set_label = f"kubectl --kubeconfig {kubeconfig} --namespace {namespace} --context {context} label {resource_type} {resource_name} {label} -o yaml " - output_set_label_yaml = {} output_set_label = "0".encode("utf-8") values = { cmd_set_label: output_set_label, } - mock_yaml_safe_load.side_effect = [kubeconfig_yaml, output_set_label_yaml] - - with patch("builtins.open", mock_open(read_data=kubeconfig_yaml_str)): - k = KubeInterface(kube_config_file=kubeconfig) - k.set_label(resource_type, resource_name, label, namespace) + set_label_mock = mocker.patch("spark8t.utils.parse_yaml_shell_output") + set_label_mock.side_effect = output_set_label + k = KubeInterface(kube_config_file=kube_config_file, defaults=defaults) + k.set_label(resource_type, resource_name, label, namespace) mock_subprocess.assert_any_call(cmd_set_label, shell=True, stderr=subprocess.STDOUT) @@ -855,9 +855,7 @@ def test_lightkube_delete_role_binding(mocker, tmp_kubeconf): ) -def test_kube_interface_create(mocker): - mock_yaml_safe_load = mocker.patch("yaml.safe_load") - mock_open = mocker.patch("builtins.open") +def test_kube_interface_create(mocker, tmp_path): mock_subprocess = mocker.patch("subprocess.check_output") # mock logic @@ -898,32 +896,32 @@ def side_effect(*args, **kwargs): "users": [{"name": f"{username}", "user": {"token": f"{token}"}}], } - kubeconfig_yaml_str = yaml.dump(kubeconfig_yaml, sort_keys=False) + kube_config_file = os.path.join(tmp_path, kubeconfig) - cmd_create = f"kubectl --kubeconfig {kubeconfig} --namespace {namespace} --context {context} create {resource_type} {resource_name} --k1=v1 --k2=v21 --k2=v22 -o name " - output_create_yaml = {} + with open(kube_config_file, "w") as fid: + yaml.dump(kubeconfig_yaml, fid, sort_keys=False) + + cmd_create = f"kubectl --kubeconfig {kube_config_file} --namespace {namespace} --context {context} create {resource_type} {resource_name} --k1=v1 --k2=v21 --k2=v22 -o name" output_create = "0".encode("utf-8") values = { cmd_create: output_create, } - mock_yaml_safe_load.side_effect = [kubeconfig_yaml, output_create_yaml] + create_mock = mocker.patch("spark8t.utils.parse_yaml_shell_output") + create_mock.side_effect = output_create - with patch("builtins.open", mock_open(read_data=kubeconfig_yaml_str)): - k = KubeInterface(kube_config_file=kubeconfig) - k.create( - resource_type, - resource_name, - namespace, - **{"k1": "v1", "k2": ["v21", "v22"]}, - ) + k = KubeInterface(kube_config_file=kube_config_file, defaults=defaults) + k.create( + resource_type, + resource_name, + namespace, + **{"k1": "v1", "k2": ["v21", "v22"]}, + ) mock_subprocess.assert_any_call(cmd_create, shell=True, stderr=subprocess.STDOUT) -def test_kube_interface_delete(mocker): - mock_yaml_safe_load = mocker.patch("yaml.safe_load") - mock_open = mocker.patch("builtins.open") +def test_kube_interface_delete(mocker, tmp_path): mock_subprocess = mocker.patch("subprocess.check_output") # mock logic @@ -964,20 +962,43 @@ def side_effect(*args, **kwargs): "users": [{"name": f"{username}", "user": {"token": f"{token}"}}], } - kubeconfig_yaml_str = yaml.dump(kubeconfig_yaml, sort_keys=False) + kube_config_file = os.path.join(tmp_path, kubeconfig) + + with open(kube_config_file, "w") as fid: + yaml.dump(kubeconfig_yaml, fid, sort_keys=False) - cmd_delete = f"kubectl --kubeconfig {kubeconfig} --namespace {namespace} --context {context} delete {resource_type} {resource_name} --ignore-not-found -o name " - output_delete_yaml = {} + cmd_delete = f"kubectl --kubeconfig {kube_config_file} --namespace {namespace} --context {context} delete {resource_type} {resource_name} --ignore-not-found -o name" output_delete = "0".encode("utf-8") values = { cmd_delete: output_delete, } - mock_yaml_safe_load.side_effect = [kubeconfig_yaml, output_delete_yaml] + delete_mock = mocker.patch("spark8t.utils.parse_yaml_shell_output") + delete_mock.side_effect = output_delete - with patch("builtins.open", mock_open(read_data=kubeconfig_yaml_str)): - k = KubeInterface(kube_config_file=kubeconfig) - k.delete(resource_type, resource_name, namespace) + k = KubeInterface(kube_config_file=kube_config_file, defaults=defaults) + k.delete(resource_type, resource_name, namespace) + + mock_subprocess.assert_any_call(cmd_delete, shell=True, stderr=subprocess.STDOUT) + + +def test_kube_interface_delete_no_kubeconfig(mocker): + mock_subprocess = mocker.patch("subprocess.check_output") + + # mock logic + def side_effect(*args, **kwargs): + return "0".encode("utf-8") # values[args[0]] + + mock_subprocess.side_effect = side_effect + + namespace = str(uuid.uuid4()) + resource_type = str(uuid.uuid4()) + resource_name = str(uuid.uuid4()) + + cmd_delete = f"kubectl --namespace {namespace} delete {resource_type} {resource_name} --ignore-not-found -o name" + + k = KubeInterface(kube_config_file=None, defaults=defaults) + k.delete(resource_type, resource_name, namespace) mock_subprocess.assert_any_call(cmd_delete, shell=True, stderr=subprocess.STDOUT) @@ -1035,10 +1056,9 @@ def side_effect(*args, **kwargs): k.get_service_account(resource_name) -def test_kube_interface_get_service_accounts(mocker): - mock_yaml_safe_load = mocker.patch("yaml.safe_load") - mock_open = mocker.patch("builtins.open") +def test_kube_interface_get_service_accounts(mocker, tmp_path): mock_subprocess = mocker.patch("subprocess.check_output") + test_id = str(uuid.uuid4()) kubeconfig = str(uuid.uuid4()) username = str(uuid.uuid4()) @@ -1072,9 +1092,12 @@ def test_kube_interface_get_service_accounts(mocker): "users": [{"name": f"{username}", "user": {"token": f"{token}"}}], } - kubeconfig_yaml_str = yaml.dump(kubeconfig_yaml, sort_keys=False) + kube_config_file = os.path.join(tmp_path, kubeconfig) - cmd_get_sa = f"kubectl --kubeconfig {kubeconfig} --context {context} get serviceaccount -l {label1} -l {label2} -n {namespace} -o yaml " + with open(kube_config_file, "w") as fid: + yaml.dump(kubeconfig_yaml, fid, sort_keys=False) + + cmd_get_sa = f"kubectl --kubeconfig {kube_config_file} --context {context} get serviceaccount -l {label1} -l {label2} -n {namespace} -o yaml" output_get_sa_yaml = { "apiVersion": "v1", "items": [ @@ -1105,28 +1128,24 @@ def side_effect(*args, **kwargs): mock_subprocess.side_effect = side_effect - mock_yaml_safe_load.side_effect = [kubeconfig_yaml, output_get_sa_yaml] - - with patch("builtins.open", mock_open(read_data=kubeconfig_yaml_str)): - k = KubeInterface(kube_config_file=kubeconfig) - sa_list = k.get_service_accounts(namespace, labels) - assert sa_list[0].get("metadata").get("name") == username - assert sa_list[0].get("metadata").get("namespace") == namespace + k = KubeInterface(kube_config_file=kube_config_file, defaults=defaults) + sa_list = k.get_service_accounts(namespace, labels) + assert sa_list[0].get("metadata").get("name") == username + assert sa_list[0].get("metadata").get("namespace") == namespace mock_subprocess.assert_any_call(cmd_get_sa, shell=True, stderr=subprocess.STDOUT) -def test_kube_interface_autodetect(mocker): - mock_yaml_safe_load = mocker.patch("yaml.safe_load") - mock_open = mocker.patch("builtins.open") +def test_kube_interface_autodetect(mocker, tmp_path): mock_subprocess = mocker.patch("subprocess.check_output") - test_id = str(uuid.uuid4()) + kubeconfig = str(uuid.uuid4()) + + test_id = str(uuid.uuid4()) username = str(uuid.uuid4()) namespace = str(uuid.uuid4()) context = str(uuid.uuid4()) token = str(uuid.uuid4()) - kubectl_cmd_str = str(uuid.uuid4()) kubeconfig_yaml = { "apiVersion": "v1", @@ -1151,10 +1170,13 @@ def test_kube_interface_autodetect(mocker): "users": [{"name": f"{username}", "user": {"token": f"{token}"}}], } - kubeconfig_yaml_str = yaml.dump(kubeconfig_yaml, sort_keys=False) + kube_config_file = os.path.join(tmp_path, kubeconfig) + + with open(kube_config_file, "w") as fid: + yaml.dump(kubeconfig_yaml, fid, sort_keys=False) cmd_autodetect = ( - f"{kubectl_cmd_str} --context {context} config view --minify -o yaml" + f"kubectl --context {context} config view --raw " "--minify -o yaml" ) output_autodetect_yaml = { "apiVersion": "v1", @@ -1187,88 +1209,16 @@ def side_effect(*args, **kwargs): return output_autodetect mock_subprocess.side_effect = side_effect - mock_yaml_safe_load.side_effect = [kubeconfig_yaml, output_autodetect_yaml] - with patch("builtins.open", mock_open(read_data=kubeconfig_yaml_str)): - k = KubeInterface(kube_config_file=kubeconfig) - ki = k.autodetect(context, kubectl_cmd_str) - assert ki.context_name == context - assert ki.kubectl_cmd == kubectl_cmd_str + ki = KubeInterface.autodetect(context, defaults) + assert ki.context_name == context + assert ki.kubectl_cmd == "kubectl" mock_subprocess.assert_any_call( cmd_autodetect, shell=True, stderr=subprocess.STDOUT ) -def test_kube_interface_select_by_master(mocker): - mock_yaml_safe_load = mocker.patch("yaml.safe_load") - mock_open = mocker.patch("builtins.open") - mocker.patch("subprocess.check_output") - test_id = str(uuid.uuid4()) - kubeconfig = str(uuid.uuid4()) - username = str(uuid.uuid4()) - namespace = str(uuid.uuid4()) - context = str(uuid.uuid4()) - token = str(uuid.uuid4()) - - kubeconfig_yaml = { - "apiVersion": "v1", - "clusters": [ - { - "cluster": { - "certificate-authority-data": f"{test_id}", - "server": f"https://0.0.0.0:{test_id}", - }, - "name": f"{context}-cluster", - } - ], - "contexts": [ - { - "context": {"cluster": f"{context}-cluster", "user": f"{username}"}, - "name": f"{context}", - } - ], - "current-context": f"{context}", - "kind": "Config", - "preferences": {}, - "users": [{"name": f"{username}", "user": {"token": f"{token}"}}], - } - - kubeconfig_yaml_str = yaml.dump(kubeconfig_yaml, sort_keys=False) - - output_select_by_master_yaml = { - "apiVersion": "v1", - "items": [ - { - "apiVersion": "v1", - "kind": "ServiceAccount", - "metadata": { - "creationTimestamp": "2022-11-21T14:32:06Z", - "labels": { - MANAGED_BY_LABELNAME: SPARK8S_LABEL, - PRIMARY_LABELNAME: "1", - }, - "name": f"{username}", - "namespace": f"{namespace}", - "resourceVersion": "321848", - "uid": "87ef7231-8106-4a36-b545-d8cf167788a6", - }, - } - ], - "kind": "List", - "metadata": {"resourceVersion": ""}, - } - - mock_yaml_safe_load.side_effect = [ - kubeconfig_yaml, - output_select_by_master_yaml, - ] - - with patch("builtins.open", mock_open(read_data=kubeconfig_yaml_str)): - k = KubeInterface(kube_config_file=kubeconfig) - assert k == k.select_by_master(f"https://0.0.0.0:{test_id}") - - def test_k8s_registry_retrieve_account_configurations(mocker): mock_kube_interface = mocker.patch("spark8t.services.KubeInterface") data = {"k": "v"} From 245654c21a3bd5aefeb0c794718a0ee9fd44bbc2 Mon Sep 17 00:00:00 2001 From: Enrico Deusebio Date: Thu, 25 Jan 2024 20:06:54 +0100 Subject: [PATCH 2/2] address reviews --- spark8t/services.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/spark8t/services.py b/spark8t/services.py index 3d20302..bc19675 100644 --- a/spark8t/services.py +++ b/spark8t/services.py @@ -49,10 +49,6 @@ class AbstractKubeInterface(WithLogging, metaclass=ABCMeta): """Abstract class for implementing Kubernetes Interface.""" - defaults: Defaults - kube_config_file: Union[None, str, Dict[str, Any]] - context_name: Optional[str] = None - def __init__( self, kube_config_file: Union[None, str, Dict[str, Any]],