From 8677e60797c53c013195711faff80c7db54c75e5 Mon Sep 17 00:00:00 2001 From: Yevhenii Semendiak Date: Thu, 30 Mar 2023 20:46:23 +0300 Subject: [PATCH 1/2] enforce token volume projection, reload staled token in client --- .../platform-operator/templates/_helpers.tpl | 4 + .../templates/controller-deployment.yaml | 20 +- platform_operator/kube_client.py | 193 +++++++++--------- 3 files changed, 123 insertions(+), 94 deletions(-) diff --git a/charts/platform-operator/templates/_helpers.tpl b/charts/platform-operator/templates/_helpers.tpl index cbfdb853..03f20ac3 100644 --- a/charts/platform-operator/templates/_helpers.tpl +++ b/charts/platform-operator/templates/_helpers.tpl @@ -120,3 +120,7 @@ release: {{ .Release.Name | quote }} {{- $host := include "platformOperator.cluster.host" . -}} {{- printf "https://metrics.%s" $host -}} {{- end -}} + +{{- define "platformOperator.kubeAuthMountRoot" -}} +{{- printf "/var/run/secrets/kubernetes.io/serviceaccount" -}} +{{- end -}} diff --git a/charts/platform-operator/templates/controller-deployment.yaml b/charts/platform-operator/templates/controller-deployment.yaml index ea39be44..7c87ecb0 100644 --- a/charts/platform-operator/templates/controller-deployment.yaml +++ b/charts/platform-operator/templates/controller-deployment.yaml @@ -51,11 +51,11 @@ spec: - name: NP_KUBE_URL value: https://kubernetes.default.svc - name: NP_KUBE_CERT_AUTHORITY_PATH - value: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt + value: {{ include "platformOperator.kubeAuthMountRoot" . }}/ca.crt - name: NP_KUBE_AUTH_TYPE value: token - name: NP_KUBE_AUTH_TOKEN_PATH - value: /var/run/secrets/kubernetes.io/serviceaccount/token + value: {{ include "platformOperator.kubeAuthMountRoot" . }}/token - name: HELM_EXPERIMENTAL_OCI value: "1" - name: NP_HELM_PLATFORM_CHART_VERSION @@ -89,7 +89,23 @@ spec: port: liveness-port initialDelaySeconds: 10 periodSeconds: 3 + volumeMounts: + - mountPath: {{ include "platformOperator.kubeAuthMountRoot" . }} + name: kube-api-token + readOnly: true {{- if .Values.imagePullSecrets }} imagePullSecrets: {{ toYaml .Values.imagePullSecrets | nindent 6 }} {{- end }} priorityClassName: {{ .Release.Namespace }}-operator-hook + volumes: + - name: kube-api-data + projected: + sources: + - serviceAccountToken: + expirationSeconds: 3600 + path: token + - configMap: + name: kube-root-ca.crt + items: + - key: ca.crt + path: ca.crt diff --git a/platform_operator/kube_client.py b/platform_operator/kube_client.py index 666d2cb0..8a429243 100644 --- a/platform_operator/kube_client.py +++ b/platform_operator/kube_client.py @@ -140,6 +140,7 @@ def __init__( trace_configs: list[aiohttp.TraceConfig] | None = None, ) -> None: self._config = config + self._token = config.auth_token self._trace_configs = trace_configs self._session: aiohttp.ClientSession | None = None self._endpoints = Endpoints(config.url) @@ -167,11 +168,15 @@ def _create_ssl_context(self) -> ssl.SSLContext | None: return ssl_context async def __aenter__(self) -> "KubeClient": + await self._init() + return self + + async def _init(self) -> None: headers = {} if self._config.auth_type == KubeClientAuthType.TOKEN: - assert self._config.auth_token or self._config.auth_token_path - if self._config.auth_token: - headers = {"Authorization": "Bearer " + self._config.auth_token} + assert self._token or self._config.auth_token_path + if self._token: + headers = {"Authorization": "Bearer " + self._token} if self._config.auth_token_path: headers = { "Authorization": "Bearer " @@ -189,51 +194,66 @@ async def __aenter__(self) -> "KubeClient": trace_configs=self._trace_configs, headers=headers, ) - return self async def __aexit__(self, *args: Any, **kwargs: Any) -> None: + await self._close() + + async def _close(self) -> None: assert self._session await self._session.close() self._session = None + async def _reload_http_client(self) -> None: + await self._close() + self._token = None + await self._init() + + async def _request(self, *args: Any, **kwargs: Any) -> dict[str, Any]: + assert self._session, "client is not initialized" + doing_retry = kwargs.pop("doing_retry", False) + async with self._session.request(*args, **kwargs) as response: + try: + response.raise_for_status() + payload = await response.json() + except aiohttp.ClientResponseError as e: + if e.code != 401 or doing_retry: + raise + # K8s SA's token might be stale, need to refresh it and retry + await self._reload_http_client() + kwargs["doing_retry"] = True + payload = await self._request(*args, **kwargs) + return payload + async def get_node(self, name: str) -> Node: - assert self._session - async with self._session.get(self._endpoints.node(name)) as response: - response.raise_for_status() - payload = await response.json() - return Node(payload) + payload = await self._request(method="get", url=self._endpoints.node(name)) + return Node(payload) async def create_namespace(self, name: str) -> None: - assert self._session - async with self._session.post( - self._endpoints.namespaces, json={"metadata": {"name": name}} - ) as response: - response.raise_for_status() + await self._request( + method="post", + url=self._endpoints.namespaces, + json={"metadata": {"name": name}}, + ) async def delete_namespace(self, name: str) -> None: - assert self._session - async with self._session.delete( - self._endpoints.namespace(name), json={"propagationPolicy": "Background"} - ) as response: - response.raise_for_status() + await self._request( + method="delete", + url=self._endpoints.namespace(name), + json={"propagationPolicy": "Background"}, + ) async def get_service(self, namespace: str, name: str) -> Service: - assert self._session - async with self._session.get( - self._endpoints.service(namespace, name) - ) as response: - response.raise_for_status() - payload = await response.json() - return Service(payload) + payload = await self._request( + method="get", + url=self._endpoints.service(namespace, name), + ) + return Service(payload) async def get_service_account(self, namespace: str, name: str) -> dict[str, Any]: - assert self._session - async with self._session.get( - self._endpoints.service_account(namespace, name) - ) as response: - response.raise_for_status() - payload = await response.json() - return payload + return await self._request( + method="get", + url=self._endpoints.service_account(namespace, name), + ) async def update_service_account( self, @@ -250,45 +270,41 @@ async def update_service_account( data["imagePullSecrets"] = [{"name": name} for name in image_pull_secrets] if not data: return - assert self._session - async with self._session.patch( - self._endpoints.service_account(namespace, name), + await self._request( + method="patch", + url=self._endpoints.service_account(namespace, name), headers={"Content-Type": "application/merge-patch+json"}, data=json.dumps(data), - ) as response: - response.raise_for_status() + ) async def create_secret(self, namespace: str, payload: dict[str, Any]) -> None: - assert self._session - async with self._session.post( - self._endpoints.secrets(namespace), json=payload - ) as response: - response.raise_for_status() + await self._request( + method="post", + url=self._endpoints.secrets(namespace), + json=payload, + ) async def update_secret( self, namespace: str, name: str, payload: dict[str, Any] ) -> None: - assert self._session - async with self._session.put( - self._endpoints.secret(namespace, name), json=payload - ) as response: - response.raise_for_status() + await self._request( + method="put", + url=self._endpoints.secret(namespace, name), + json=payload, + ) async def delete_secret(self, namespace: str, name: str) -> None: - assert self._session - async with self._session.delete( - self._endpoints.secret(namespace, name) - ) as response: - response.raise_for_status() + await self._request( + method="delete", + url=self._endpoints.secret(namespace, name), + ) async def get_secret(self, namespace: str, name: str) -> Secret: - assert self._session - async with self._session.get( - self._endpoints.secret(namespace, name) - ) as response: - response.raise_for_status() - payload = await response.json() - return Secret(self._decode_secret_data(payload)) + payload = await self._request( + method="get", + url=self._endpoints.secret(namespace, name), + ) + return Secret(self._decode_secret_data(payload)) def _encode_secret_data(self, secret: dict[str, Any]) -> dict[str, Any]: secret = dict(**secret) @@ -320,13 +336,11 @@ async def get_pods( query["labelSelector"] = ",".join( f"{key}={value}" for key, value in label_selector.items() ) - assert self._session - async with self._session.get( - self._endpoints.pods(namespace).with_query(**query) - ) as response: - response.raise_for_status() - payload = await response.json() - return payload["items"] + payload = await self._request( + method="get", + url=self._endpoints.pods(namespace).with_query(**query), + ) + return payload["items"] async def wait_till_pods_deleted( self, @@ -341,43 +355,38 @@ async def wait_till_pods_deleted( await asyncio.sleep(interval_secs) async def create_platform(self, namespace: str, payload: dict[str, Any]) -> None: - assert self._session - async with self._session.post( - self._endpoints.platforms(namespace=namespace), json=payload - ) as response: - response.raise_for_status() + await self._request( + method="post", + url=self._endpoints.platforms(namespace=namespace), + json=payload, + ) async def delete_platform(self, namespace: str, name: str) -> None: - assert self._session - async with self._session.delete( - self._endpoints.platform(namespace, name) - ) as response: - response.raise_for_status() + await self._request( + method="delete", url=self._endpoints.platform(namespace, name) + ) async def get_platform_status( self, namespace: str, name: str ) -> dict[str, Any] | None: - assert self._session - async with self._session.get( - self._endpoints.platform(namespace, name) / "status" - ) as response: - response.raise_for_status() - payload = await response.json() - if "status" not in payload: - return None - status_payload = payload["status"] - return status_payload + payload = await self._request( + method="get", + url=self._endpoints.platform(namespace, name) / "status", + ) + if "status" not in payload: + return None + status_payload = payload["status"] + return status_payload async def update_platform_status( self, namespace: str, name: str, payload: dict[str, Any] ) -> None: - assert self._session - async with self._session.patch( - self._endpoints.platform(namespace, name) / "status", + await self._request( + method="patch", + url=self._endpoints.platform(namespace, name) / "status", headers={"Content-Type": "application/merge-patch+json"}, data=json.dumps({"status": payload}), - ) as response: - response.raise_for_status() + ) async def acquire_lock( self, From e5025f696e589e30ea73708c503afc241af0a569 Mon Sep 17 00:00:00 2001 From: Yevhenii Semendiak Date: Tue, 4 Apr 2023 15:35:12 +0300 Subject: [PATCH 2/2] fix chart --- charts/platform-operator/templates/controller-deployment.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/charts/platform-operator/templates/controller-deployment.yaml b/charts/platform-operator/templates/controller-deployment.yaml index 7c87ecb0..3d06fe15 100644 --- a/charts/platform-operator/templates/controller-deployment.yaml +++ b/charts/platform-operator/templates/controller-deployment.yaml @@ -91,7 +91,7 @@ spec: periodSeconds: 3 volumeMounts: - mountPath: {{ include "platformOperator.kubeAuthMountRoot" . }} - name: kube-api-token + name: kube-api-data readOnly: true {{- if .Values.imagePullSecrets }} imagePullSecrets: {{ toYaml .Values.imagePullSecrets | nindent 6 }}