From c7d3c7c187579276345bcdac75ca1eba23761ed9 Mon Sep 17 00:00:00 2001 From: mbohlool Date: Wed, 30 Nov 2016 03:33:38 -0800 Subject: [PATCH] - Add context switch to kube config loader - Refactor kube config loader to be able to test it - Add test for kube config loader --- examples/example4.py | 57 ++++ kubernetes/config/__init__.py | 1 + kubernetes/config/kube_config.py | 330 +++++++++++++------- kubernetes/config/kube_config_test.py | 413 ++++++++++++++++++++++++++ 4 files changed, 689 insertions(+), 112 deletions(-) create mode 100644 examples/example4.py create mode 100644 kubernetes/config/kube_config_test.py diff --git a/examples/example4.py b/examples/example4.py new file mode 100644 index 0000000000..cc8cb6c5e9 --- /dev/null +++ b/examples/example4.py @@ -0,0 +1,57 @@ +# Copyright 2016 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +from kubernetes import client, config +from kubernetes.client import configuration + + +def main(): + config_file = os.environ["HOME"] + '/.kube/config' + contexts, active_context = config.list_kube_config_context(config_file) + if not contexts: + print("Cannot find any context in kube-config file.") + return + for i in range(len(contexts)): + format_str = "%d. %s" + if contexts[i]['name'] == active_context['name']: + format_str = "* " + format_str + print(format_str % (i, contexts[i]['name'])) + context = input("Enter context number: ") + context = int(context) + if context not in range(len(contexts)): + print( + "Number out of range. Using default context %s." % + active_context) + return + else: + context_name = contexts[context]['name'] + + # Configs can be set in Configuration class directly or using helper + # utility + config.load_kube_config(os.environ["HOME"] + '/.kube/config', context_name) + + print("Active host is %s" % configuration.host) + + v1 = client.CoreV1Api() + print("Listing pods with their IPs:") + ret = v1.list_pod_for_all_namespaces(watch=False) + for i in ret.items: + print("%s\t%s\t%s" % + (i.status.pod_ip, i.metadata.namespace, i.metadata.name)) + + +if __name__ == '__main__': + main() diff --git a/kubernetes/config/__init__.py b/kubernetes/config/__init__.py index e52d8a4a6e..e487ebd74f 100644 --- a/kubernetes/config/__init__.py +++ b/kubernetes/config/__init__.py @@ -14,4 +14,5 @@ from .config_exception import ConfigException from .incluster_config import load_incluster_config +from .kube_config import list_kube_config_context from .kube_config import load_kube_config diff --git a/kubernetes/config/kube_config.py b/kubernetes/config/kube_config.py index cc1c483de5..dca521e52e 100644 --- a/kubernetes/config/kube_config.py +++ b/kubernetes/config/kube_config.py @@ -22,19 +22,26 @@ from kubernetes.client import configuration from oauth2client.client import GoogleCredentials -_temp_files = [] +from .incluster_config import ConfigException + +_temp_files = {} def _cleanup_temp_files(): - for f in _temp_files: - os.remove(f) + for k in _temp_files: + os.remove(_temp_files[k]) def _create_temp_file_with_content(content): if len(_temp_files) == 0: atexit.register(_cleanup_temp_files) + # Because we may change context several times, try to remember files we + # created and reuse them at a small memory cost. + content_key = content if isinstance(content, str) else str(content) + if content_key in _temp_files: + return _temp_files[content_key] _, name = tempfile.mkstemp() - _temp_files.append(name) + _temp_files[content_key] = name if isinstance(content, str): content = content.encode('utf8') with open(name, 'wb') as fd: @@ -42,131 +49,230 @@ def _create_temp_file_with_content(content): return name -def _file_from_file_or_data(o, file_key_name, data_key_name=None): - if not data_key_name: - data_key_name = file_key_name + "-data" - if data_key_name in o: - return _create_temp_file_with_content(o[data_key_name]) - if file_key_name in o: - return o[file_key_name] - - -def _data_from_file_or_data(o, file_key_name, data_key_name=None): - if not data_key_name: - data_key_name = file_key_name + "_data" - if data_key_name in o: - return o[data_key_name] - if file_key_name in o: - with open(o[file_key_name], 'r') as f: - data = f.read() - return data - - -def _load_gcp_token(user): - if 'auth-provider' not in user: - return - if 'name' not in user['auth-provider']: - return - if user['auth-provider']['name'] != 'gcp': - return - # Ignore configs in auth-provider and rely on GoogleCredentials - # caching and refresh mechanism. - # TODO: support gcp command based token ("cmd-path" config). - return (GoogleCredentials - .get_application_default() - .get_access_token() - .access_token) - - -def _load_authentication(user): - """Read authentication from kube-config user section. - - This function goes through various authetication methods in user section of - kubeconfig and stops if it founds a valid authentication method. The order - of authentication methods is: - - 1. GCP auth-provider - 2. token_data - 3. token field (point to a token file) - 4. username/password - """ - # Read authentication - token = _load_gcp_token(user) - if not token: - token = _data_from_file_or_data(user, 'tokenFile', 'token') - if token: - configuration.api_key['authorization'] = "bearer " + token - else: - if 'username' in user and 'password' in user: - configuration.api_key['authorization'] = urllib3.util.make_headers( - basic_auth=user['username'] + ':' + - user['password']).get('authorization') - - -def _load_cluster_info(cluster, user): - """Loads cluster information from kubeconfig such as host and SSL certs.""" - if 'server' in cluster: - configuration.host = cluster['server'] - if configuration.host.startswith("https"): - configuration.ssl_ca_cert = _file_from_file_or_data( - cluster, 'certificate-authority') - configuration.cert_file = _file_from_file_or_data( - user, 'client-certificate') - configuration.key_file = _file_from_file_or_data( - user, 'client-key') - - -class _node: - """Remembers each key's path and construct a relevant exception message - in case of missing keys.""" +class FileOrData(object): + """Utility class to read content of obj[%data_key_name] or file's + content of obj[%file_key_name] and represent it as file or data. + Note that the data is preferred. The obj[%file_key_name] will be used iff + obj['%data_key_name'] is not set or empty. Assumption is file content is + raw data and data field is base64 string.""" + + def __init__(self, obj, file_key_name, data_key_name=None): + if not data_key_name: + data_key_name = file_key_name + "-data" + self._file = None + self._data = None + if data_key_name in obj: + self._data = obj[data_key_name] + elif file_key_name in obj: + self._file = obj[file_key_name] + + @property + def file(self): + """If obj[%data_key_name] exists, return name of a file with base64 + decoded obj[%data_key_name] content otherwise obj[%file_key_name].""" + use_data_if_no_file = not self._file and self._data + if use_data_if_no_file: + self._file = _create_temp_file_with_content(self._data) + return self._file + + @property + def data(self): + """If obj[%data_key_name] exists, Return obj[%data_key_name] otherwise + base64 encoded string of obj[%file_key_name] file content.""" + use_file_if_no_data = not self._data and self._file + if use_file_if_no_data: + with open(self._file) as f: + self._data = bytes.decode( + base64.encodestring(str.encode(f.read()))) + return self._data + + +class KubeConfigLoader(object): + + def __init__(self, config_dict, active_context=None, + get_google_credentials=None, client_configuration=None): + self._config = ConfigNode('kube-config', config_dict) + self._current_context = None + self._user = None + self._cluster = None + self.set_active_context(active_context) + if get_google_credentials: + self._get_google_credentials = get_google_credentials + else: + self._get_google_credentials = lambda: ( + GoogleCredentials.get_application_default() + .get_access_token().access_token) + if client_configuration: + self._client_configuration = client_configuration + else: + self._client_configuration = configuration + + def set_active_context(self, context_name=None): + if context_name is None: + context_name = self._config['current-context'] + self._current_context = self._config['contexts'].get_with_name( + context_name) + if self._current_context['context'].safe_get('user'): + self._user = self._config['users'].get_with_name( + self._current_context['context']['user'])['user'] + else: + self._user = None + self._cluster = self._config['clusters'].get_with_name( + self._current_context['context']['cluster'])['cluster'] + + def _load_authentication(self): + """Read authentication from kube-config user section if exists. + + This function goes through various authetication methods in user + section of kubeconfig and stops if it founds a valid authentication + method. The order of authentication methods is: + + 1. GCP auth-provider + 2. token_data + 3. token field (point to a token file) + 4. username/password + """ + if not self._user: + return + if self._load_gcp_token(): + return + if self._load_user_token(): + return + self._load_userpass_token() + + def _load_gcp_token(self): + if 'auth-provider' not in self._user: + return + if 'name' not in self._user['auth-provider']: + return + if self._user['auth-provider']['name'] != 'gcp': + return + # Ignore configs in auth-provider and rely on GoogleCredentials + # caching and refresh mechanism. + # TODO: support gcp command based token ("cmd-path" config). + self.token = self._get_google_credentials() + return self.token + + def _load_user_token(self): + token = FileOrData(self._user, 'tokenFile', 'token').data + if token: + self.token = token + return True + + def _load_userpass_token(self): + if 'username' in self._user and 'password' in self._user: + self.token = urllib3.util.make_headers( + basic_auth=(self._user['username'] + ':' + + self._user['password'])).get('authorization') + return True + + def _load_cluster_info(self): + if 'server' in self._cluster: + self.host = self._cluster['server'] + if self.host.startswith("https"): + self.ssl_ca_cert = FileOrData( + self._cluster, 'certificate-authority').file + self.cert_file = FileOrData( + self._user, 'client-certificate').file + self.key_file = FileOrData(self._user, 'client-key').file + + def _set_config(self): + if hasattr(self, 'token'): + self._client_configuration.api_key['authorization'] = self.token + # copy these keys directly from self to configuration object + keys = ['host', 'ssl_ca_cert', 'cert_file', 'key_file'] + for key in keys: + if hasattr(self, key): + setattr(self._client_configuration, key, getattr(self, key)) + + def load_and_set(self): + self._load_authentication() + self._load_cluster_info() + self._set_config() + + def list_context(self): + contexts = [] + for c in self._config['contexts']: + contexts.append(c.value) + return contexts + + @property + def current_context(self): + return self._current_context.value + + +class ConfigNode: + """Remembers each conifg key's path and construct a relevant exception + message in case of missing keys. The assumption is all access keys are + present in a well-formed kube-config.""" def __init__(self, name, value): self._name = name self._value = value + @property + def value(self): + return self._value + + @property + def name(self): + return self._name + def __contains__(self, key): - return key in self._value + return key in self.value + + def __len__(self): + return len(self.value) + + def safe_get(self, key): + if (isinstance(self.value, list) and isinstance(key, int) or + key in self.value): + return self.value[key] def __getitem__(self, key): - if key in self._value: - v = self._value[key] - if isinstance(v, dict) or isinstance(v, list): - return _node('%s/%s' % (self._name, key), v) - else: - return v - raise Exception( - 'Invalid kube-config file. Expected key %s in %s' - % (key, self._name)) + v = self.safe_get(key) + if not v: + raise ConfigException( + 'Invalid kube-config file. Expect key %s in %s' + % (key, self._name)) + if isinstance(v, dict) or isinstance(v, list): + return ConfigNode('%s/%s' % (self._name, key), v) + else: + return v def get_with_name(self, name): - if not isinstance(self._value, list): - raise Exception( - 'Invalid kube-config file. Expected %s to be a list' + if not isinstance(self.value, list): + raise ConfigException( + 'Invalid kube-config file. Expect %s to be a list' % self._name) - for v in self._value: + for v in self.value: if 'name' not in v: - raise Exception( + raise ConfigException( 'Invalid kube-config file. ' - 'Expected all values in %s list to have \'name\' key' + 'Expect all values in %s list to have \'name\' key' % self._name) if v['name'] == name: - return _node('%s[name=%s]' % (self._name, name), v) - raise Exception( - "Cannot find object with name %s in %s list" % (name, self._name)) - + return ConfigNode('%s[name=%s]' % (self._name, name), v) + raise ConfigException( + 'Invalid kube-config file. ' + 'Expect object with name %s in %s list' % (name, self._name)) -def load_kube_config(config_file): - """Loads authentication and cluster information from kube-config file - and store them in kubernetes.client.configuration.""" +def list_kube_config_context(config_file): with open(config_file) as f: - config = _node('kube-config', yaml.load(f)) + loader = KubeConfigLoader(config_dict=yaml.load(f)) + return loader.list_context(), loader.current_context + - current_context = config['contexts'].get_with_name( - config['current-context'])['context'] - user = config['users'].get_with_name(current_context['user'])['user'] - cluster = config['clusters'].get_with_name( - current_context['cluster'])['cluster'] +def load_kube_config(config_file, context=None): + """Loads authentication and cluster information from kube-config file + and store them in kubernetes.client.configuration. + + :param config_file: Name of the kube-config file. + :param context: set the active context. If is set to None, current_context + from config file will be used. + """ - _load_cluster_info(cluster, user) - _load_authentication(user) + with open(config_file) as f: + KubeConfigLoader( + config_dict=yaml.load(f), active_context=context).load_and_set() diff --git a/kubernetes/config/kube_config_test.py b/kubernetes/config/kube_config_test.py new file mode 100644 index 0000000000..3b62545b0e --- /dev/null +++ b/kubernetes/config/kube_config_test.py @@ -0,0 +1,413 @@ +# Copyright 2016 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import base64 +import os +import tempfile +import unittest + +from .incluster_config import ConfigException +from .kube_config import ConfigNode, FileOrData, KubeConfigLoader + + +def _base64(string): + return base64.encodestring(string.encode()).decode() + + +TEST_FILE_KEY = "file" +TEST_DATA_KEY = "data" +TEST_FILENAME = "testfilename" + +TEST_DATA = "testdata" +TEST_DATA_BASE64 = _base64(TEST_DATA) + +TEST_ANOTHER_DATA = "anothertestdata" +TEST_ANOTHER_DATA_BASE64 = _base64(TEST_ANOTHER_DATA) + +TEST_HOST = "testhost" +TEST_USERNAME = "me" +TEST_PASSWORD = "pass" +# token for me:pass +TEST_BASIC_TOKEN = "Basic bWU6cGFzcw==" + +TEST_SSL_HOST = "https://testhost" +TEST_CERTIFICATE_AUTH = "certauth" +TEST_CERTIFICATE_AUTH_BASE64 = _base64(TEST_CERTIFICATE_AUTH) +TEST_CLIENT_KEY = "clientkey" +TEST_CLIENT_KEY_BASE64 = _base64(TEST_CLIENT_KEY) +TEST_CLIENT_CERT = "clientcert" +TEST_CLIENT_CERT_BASE64 = _base64(TEST_CLIENT_CERT) + + +class TestRoot(unittest.TestCase): + + def setUp(self): + self._temp_files = [] + + def tearDown(self): + for f in self._temp_files: + os.remove(f) + + def _create_temp_file(self, content=""): + handler, name = tempfile.mkstemp() + self._temp_files.append(name) + os.write(handler, str.encode(content)) + os.close(handler) + return name + + +class TestFileOrData(TestRoot): + + def get_file_content(self, file): + with open(file) as f: + return f.read() + + def test_file_given_file(self): + obj = {TEST_FILE_KEY: TEST_FILENAME} + t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY) + self.assertEqual(TEST_FILENAME, t.file) + + def test_file_fiven_data(self): + obj = {TEST_DATA_KEY: TEST_DATA_BASE64} + t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY, + data_key_name=TEST_DATA_KEY) + self.assertEqual(TEST_DATA, self.get_file_content(t.file)) + + def test_data_given_data(self): + obj = {TEST_DATA_KEY: TEST_DATA_BASE64} + t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY, + data_key_name=TEST_DATA_KEY) + self.assertEqual(TEST_DATA_BASE64, t.data) + + def test_data_given_file(self): + obj = { + TEST_FILE_KEY: self._create_temp_file(content=TEST_DATA)} + t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY) + self.assertEqual(TEST_DATA_BASE64, t.data) + + def test_data_given_file_and_data(self): + obj = { + TEST_DATA_KEY: TEST_DATA_BASE64, + TEST_FILE_KEY: self._create_temp_file( + content=TEST_ANOTHER_DATA)} + t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY, + data_key_name=TEST_DATA_KEY) + self.assertEqual(TEST_DATA_BASE64, t.data) + + def test_file_given_file_and_data(self): + obj = { + TEST_DATA_KEY: TEST_DATA_BASE64, + TEST_FILE_KEY: self._create_temp_file( + content=TEST_ANOTHER_DATA)} + t = FileOrData(obj=obj, file_key_name=TEST_FILE_KEY, + data_key_name=TEST_DATA_KEY) + self.assertEqual(TEST_DATA, self.get_file_content(t.file)) + + +class TestConfigNode(TestRoot): + + test_obj = {"key1": "test", "key2": ["a", "b", "c"], + "key3": {"inner_key": "inner_value"}, + "with_names": [{"name": "test_name", "value": "test_value"}, + {"name": "test_name2", + "value": {"key1", "test"}}, + {"name": "test_name3", "value": [1, 2, 3]}]} + + def test_normal_map_array_operations(self): + node = ConfigNode("test_obj", self.test_obj) + self.assertEqual("test", node['key1']) + self.assertEqual(4, len(node)) + + self.assertEqual("test_obj/key2", node['key2'].name) + self.assertEqual(["a", "b", "c"], node['key2'].value) + self.assertEqual("b", node['key2'][1]) + self.assertEqual(3, len(node['key2'])) + + self.assertEqual("test_obj/key3", node['key3'].name) + self.assertEqual({"inner_key": "inner_value"}, node['key3'].value) + self.assertEqual("inner_value", node['key3']["inner_key"]) + self.assertEqual(1, len(node['key3'])) + + def test_get_with_name(self): + node = ConfigNode("test_obj_with_names", + self.test_obj["with_names"]) + self.assertEqual( + "test_value", + node.get_with_name("test_name")["value"]) + self.assertTrue( + isinstance(node.get_with_name("test_name2"), ConfigNode)) + self.assertTrue( + isinstance(node.get_with_name("test_name3"), ConfigNode)) + self.assertEqual("test_obj_with_names[name=test_name2]", + node.get_with_name("test_name2").name) + self.assertEqual("test_obj_with_names[name=test_name3]", + node.get_with_name("test_name3").name) + + def expect_exception(self, func, message_part): + try: + func() + self.fail("Should fail.") + except ConfigException as e: + self.assertTrue( + message_part in e.message, "'%s' should be in '%s'" % + (message_part, e.message)) + + def test_invalid_key(self): + node = ConfigNode("test_obj", self.test_obj) + self.expect_exception(lambda: node['non-existance-key'], + "Expect key non-existance-key in test_obj") + self.expect_exception(lambda: node['key3']['non-existance-key'], + "Expect key non-existance-key in test_obj/key3") + self.expect_exception( + lambda: node['key2'].get_with_name('noname'), + "Expect all values in test_obj/key2 list to have \'name\' key") + self.expect_exception( + lambda: node['key3'].get_with_name('noname'), + "Expect test_obj/key3 to be a list") + self.expect_exception( + lambda: node['with_names'].get_with_name('noname'), + "Expect object with name noname in test_obj/with_names list") + + +class FakeConfig: + + FILE_KEYS = ["ssl_ca_cert", "key_file", "cert_file"] + + def __init__(self, token=None, **kwargs): + self.api_key = {} + if token: + self.api_key['authorization'] = token + + self.__dict__.update(kwargs) + + def __eq__(self, other): + if len(self.__dict__) != len(other.__dict__): + return + for k, v in self.__dict__.items(): + if k not in other.__dict__: + return + if k in self.FILE_KEYS: + try: + with open(v) as f1, open(other.__dict__[k]) as f2: + if f1.read() != f2.read(): + return + except IOError as e: + # fall back to only compare filenames in case we are testing + # passing filename to the config + if other.__dict__[k] != v: + return + else: + if other.__dict__[k] != v: + return + return True + + def __repr__(self): + rep = "\n" + for k, v in self.__dict__.items(): + val = v + if k in self.FILE_KEYS: + try: + with open(v) as f: + val = "FILE: %s" % str.decode(f.read()) + except IOError as e: + val = "ERROR: %s" % str(e) + rep += "\t%s: %s\n" % (k, val) + return "Config(%s\n)" % rep + + +class TestKubeConfigLoader(TestRoot): + TEST_KUBE_CONFIG = { + "contexts": [ + { + "name": "no_user", + "context": { + "cluster": "default" + } + }, + { + "name": "simple_token", + "context": { + "cluster": "default", + "user": "simple_token" + } + }, + { + "name": "gcp", + "context": { + "cluster": "default", + "user": "gcp" + } + }, + { + "name": "userpass", + "context": { + "cluster": "default", + "user": "userpass" + } + }, + { + "name": "ssl", + "context": { + "cluster": "ssl", + "user": "ssl" + } + }, + { + "name": "ssl-nofile", + "context": { + "cluster": "ssl-nofile", + "user": "ssl-nofile" + } + }, + ], + "clusters": [ + { + "name": "default", + "cluster": { + "server": TEST_HOST + } + }, + { + "name": "ssl-nofile", + "cluster": { + "server": TEST_SSL_HOST, + "certificate-authority": TEST_CERTIFICATE_AUTH, + } + }, + { + "name": "ssl", + "cluster": { + "server": TEST_SSL_HOST, + "certificate-authority-data": TEST_CERTIFICATE_AUTH_BASE64, + } + }, + ], + "users": [ + { + "name": "simple_token", + "user": { + "token": TEST_DATA_BASE64, + "username": TEST_USERNAME, # should be ignored + "password": TEST_PASSWORD, # should be ignored + } + }, + { + "name": "gcp", + "user": { + "auth-provider": { + "name": "gcp", + "access_token": "not_used", + }, + "token": TEST_DATA_BASE64, # should be ignored + "username": TEST_USERNAME, # should be ignored + "password": TEST_PASSWORD, # should be ignored + } + }, + { + "name": "userpass", + "user": { + "username": TEST_USERNAME, # should be ignored + "password": TEST_PASSWORD, # should be ignored + } + }, + { + "name": "ssl-nofile", + "user": { + "token": TEST_DATA_BASE64, + "client-certificate": TEST_CLIENT_CERT, + "client-key": TEST_CLIENT_KEY, + } + }, + { + "name": "ssl", + "user": { + "token": TEST_DATA_BASE64, + "client-certificate-data": TEST_CLIENT_CERT_BASE64, + "client-key-data": TEST_CLIENT_KEY_BASE64, + } + }, + ] + } + + def test_no_user_context(self): + expected = FakeConfig(host=TEST_HOST) + actual = FakeConfig() + KubeConfigLoader( + config_dict=self.TEST_KUBE_CONFIG, + active_context="no_user", + client_configuration=actual).load_and_set() + self.assertEqual(expected, actual) + + def test_simple_token(self): + expected = FakeConfig(host=TEST_HOST, token=TEST_DATA_BASE64) + actual = FakeConfig() + KubeConfigLoader( + config_dict=self.TEST_KUBE_CONFIG, + active_context="simple_token", + client_configuration=actual).load_and_set() + self.assertEqual(expected, actual) + + def test_gcp(self): + expected = FakeConfig(host=TEST_HOST, token=TEST_ANOTHER_DATA_BASE64) + actual = FakeConfig() + KubeConfigLoader( + config_dict=self.TEST_KUBE_CONFIG, + active_context="gcp", + client_configuration=actual, + get_google_credentials=lambda: TEST_ANOTHER_DATA_BASE64) \ + .load_and_set() + self.assertEqual(expected, actual) + + def test_userpass(self): + expected = FakeConfig(host=TEST_HOST, token=TEST_BASIC_TOKEN) + actual = FakeConfig() + KubeConfigLoader( + config_dict=self.TEST_KUBE_CONFIG, + active_context="userpass", + client_configuration=actual).load_and_set() + self.assertEqual(expected, actual) + + def test_ssl_no_certfiles(self): + expected = FakeConfig( + host=TEST_SSL_HOST, + token=TEST_DATA_BASE64, + cert_file=TEST_CLIENT_CERT, + key_file=TEST_CLIENT_KEY, + ssl_ca_cert=TEST_CERTIFICATE_AUTH + ) + actual = FakeConfig() + KubeConfigLoader( + config_dict=self.TEST_KUBE_CONFIG, + active_context="ssl-nofile", + client_configuration=actual).load_and_set() + self.assertEqual(expected, actual) + + def test_ssl(self): + expected = FakeConfig( + host=TEST_SSL_HOST, + token=TEST_DATA_BASE64, + cert_file=self._create_temp_file(TEST_CLIENT_CERT), + key_file=self._create_temp_file(TEST_CLIENT_KEY), + ssl_ca_cert=self._create_temp_file(TEST_CERTIFICATE_AUTH) + ) + actual = FakeConfig() + KubeConfigLoader( + config_dict=self.TEST_KUBE_CONFIG, + active_context="ssl", + client_configuration=actual).load_and_set() + self.assertEqual(expected, actual) + + +if __name__ == '__main__': + unittest.main()