Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enforce k8s token volume projection, reload staled token in client #640

Merged
merged 2 commits into from
Apr 4, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions charts/platform-operator/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,7 @@ release: {{ .Release.Name | quote }}
{{- $host := include "platformOperator.cluster.host" . -}}
{{- printf "https://metrics.%s" $host -}}
{{- end -}}

{{- define "platformOperator.kubeAuthMountRoot" -}}
{{- printf "/var/run/secrets/kubernetes.io/serviceaccount" -}}
{{- end -}}
20 changes: 18 additions & 2 deletions charts/platform-operator/templates/controller-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ spec:
- name: NP_KUBE_URL
value: https://kubernetes.default.svc
- name: NP_KUBE_CERT_AUTHORITY_PATH
value: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
value: {{ include "platformOperator.kubeAuthMountRoot" . }}/ca.crt
- name: NP_KUBE_AUTH_TYPE
value: token
- name: NP_KUBE_AUTH_TOKEN_PATH
value: /var/run/secrets/kubernetes.io/serviceaccount/token
value: {{ include "platformOperator.kubeAuthMountRoot" . }}/token
- name: HELM_EXPERIMENTAL_OCI
value: "1"
- name: NP_HELM_PLATFORM_CHART_VERSION
Expand Down Expand Up @@ -89,7 +89,23 @@ spec:
port: liveness-port
initialDelaySeconds: 10
periodSeconds: 3
volumeMounts:
- mountPath: {{ include "platformOperator.kubeAuthMountRoot" . }}
name: kube-api-token
readOnly: true
{{- if .Values.imagePullSecrets }}
imagePullSecrets: {{ toYaml .Values.imagePullSecrets | nindent 6 }}
{{- end }}
priorityClassName: {{ .Release.Namespace }}-operator-hook
volumes:
- name: kube-api-data
projected:
sources:
- serviceAccountToken:
expirationSeconds: 3600
path: token
- configMap:
name: kube-root-ca.crt
items:
- key: ca.crt
path: ca.crt
193 changes: 101 additions & 92 deletions platform_operator/kube_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ def __init__(
trace_configs: list[aiohttp.TraceConfig] | None = None,
) -> None:
self._config = config
self._token = config.auth_token
self._trace_configs = trace_configs
self._session: aiohttp.ClientSession | None = None
self._endpoints = Endpoints(config.url)
Expand Down Expand Up @@ -167,11 +168,15 @@ def _create_ssl_context(self) -> ssl.SSLContext | None:
return ssl_context

async def __aenter__(self) -> "KubeClient":
await self._init()
return self

async def _init(self) -> None:
headers = {}
if self._config.auth_type == KubeClientAuthType.TOKEN:
assert self._config.auth_token or self._config.auth_token_path
if self._config.auth_token:
headers = {"Authorization": "Bearer " + self._config.auth_token}
assert self._token or self._config.auth_token_path
if self._token:
headers = {"Authorization": "Bearer " + self._token}
if self._config.auth_token_path:
headers = {
"Authorization": "Bearer "
Expand All @@ -189,51 +194,66 @@ async def __aenter__(self) -> "KubeClient":
trace_configs=self._trace_configs,
headers=headers,
)
return self

async def __aexit__(self, *args: Any, **kwargs: Any) -> None:
await self._close()

async def _close(self) -> None:
assert self._session
await self._session.close()
self._session = None

async def _reload_http_client(self) -> None:
await self._close()
self._token = None
await self._init()

async def _request(self, *args: Any, **kwargs: Any) -> dict[str, Any]:
assert self._session, "client is not initialized"
doing_retry = kwargs.pop("doing_retry", False)
async with self._session.request(*args, **kwargs) as response:
try:
response.raise_for_status()
payload = await response.json()
except aiohttp.ClientResponseError as e:
if e.code != 401 or doing_retry:
raise
# K8s SA's token might be stale, need to refresh it and retry
await self._reload_http_client()
kwargs["doing_retry"] = True
payload = await self._request(*args, **kwargs)
return payload

async def get_node(self, name: str) -> Node:
assert self._session
async with self._session.get(self._endpoints.node(name)) as response:
response.raise_for_status()
payload = await response.json()
return Node(payload)
payload = await self._request(method="get", url=self._endpoints.node(name))
return Node(payload)

async def create_namespace(self, name: str) -> None:
assert self._session
async with self._session.post(
self._endpoints.namespaces, json={"metadata": {"name": name}}
) as response:
response.raise_for_status()
await self._request(
method="post",
url=self._endpoints.namespaces,
json={"metadata": {"name": name}},
)

async def delete_namespace(self, name: str) -> None:
assert self._session
async with self._session.delete(
self._endpoints.namespace(name), json={"propagationPolicy": "Background"}
) as response:
response.raise_for_status()
await self._request(
method="delete",
url=self._endpoints.namespace(name),
json={"propagationPolicy": "Background"},
)

async def get_service(self, namespace: str, name: str) -> Service:
assert self._session
async with self._session.get(
self._endpoints.service(namespace, name)
) as response:
response.raise_for_status()
payload = await response.json()
return Service(payload)
payload = await self._request(
method="get",
url=self._endpoints.service(namespace, name),
)
return Service(payload)

async def get_service_account(self, namespace: str, name: str) -> dict[str, Any]:
assert self._session
async with self._session.get(
self._endpoints.service_account(namespace, name)
) as response:
response.raise_for_status()
payload = await response.json()
return payload
return await self._request(
method="get",
url=self._endpoints.service_account(namespace, name),
)

async def update_service_account(
self,
Expand All @@ -250,45 +270,41 @@ async def update_service_account(
data["imagePullSecrets"] = [{"name": name} for name in image_pull_secrets]
if not data:
return
assert self._session
async with self._session.patch(
self._endpoints.service_account(namespace, name),
await self._request(
method="patch",
url=self._endpoints.service_account(namespace, name),
headers={"Content-Type": "application/merge-patch+json"},
data=json.dumps(data),
) as response:
response.raise_for_status()
)

async def create_secret(self, namespace: str, payload: dict[str, Any]) -> None:
assert self._session
async with self._session.post(
self._endpoints.secrets(namespace), json=payload
) as response:
response.raise_for_status()
await self._request(
method="post",
url=self._endpoints.secrets(namespace),
json=payload,
)

async def update_secret(
self, namespace: str, name: str, payload: dict[str, Any]
) -> None:
assert self._session
async with self._session.put(
self._endpoints.secret(namespace, name), json=payload
) as response:
response.raise_for_status()
await self._request(
method="put",
url=self._endpoints.secret(namespace, name),
json=payload,
)

async def delete_secret(self, namespace: str, name: str) -> None:
assert self._session
async with self._session.delete(
self._endpoints.secret(namespace, name)
) as response:
response.raise_for_status()
await self._request(
method="delete",
url=self._endpoints.secret(namespace, name),
)

async def get_secret(self, namespace: str, name: str) -> Secret:
assert self._session
async with self._session.get(
self._endpoints.secret(namespace, name)
) as response:
response.raise_for_status()
payload = await response.json()
return Secret(self._decode_secret_data(payload))
payload = await self._request(
method="get",
url=self._endpoints.secret(namespace, name),
)
return Secret(self._decode_secret_data(payload))

def _encode_secret_data(self, secret: dict[str, Any]) -> dict[str, Any]:
secret = dict(**secret)
Expand Down Expand Up @@ -320,13 +336,11 @@ async def get_pods(
query["labelSelector"] = ",".join(
f"{key}={value}" for key, value in label_selector.items()
)
assert self._session
async with self._session.get(
self._endpoints.pods(namespace).with_query(**query)
) as response:
response.raise_for_status()
payload = await response.json()
return payload["items"]
payload = await self._request(
method="get",
url=self._endpoints.pods(namespace).with_query(**query),
)
return payload["items"]

async def wait_till_pods_deleted(
self,
Expand All @@ -341,43 +355,38 @@ async def wait_till_pods_deleted(
await asyncio.sleep(interval_secs)

async def create_platform(self, namespace: str, payload: dict[str, Any]) -> None:
assert self._session
async with self._session.post(
self._endpoints.platforms(namespace=namespace), json=payload
) as response:
response.raise_for_status()
await self._request(
method="post",
url=self._endpoints.platforms(namespace=namespace),
json=payload,
)

async def delete_platform(self, namespace: str, name: str) -> None:
assert self._session
async with self._session.delete(
self._endpoints.platform(namespace, name)
) as response:
response.raise_for_status()
await self._request(
method="delete", url=self._endpoints.platform(namespace, name)
)

async def get_platform_status(
self, namespace: str, name: str
) -> dict[str, Any] | None:
assert self._session
async with self._session.get(
self._endpoints.platform(namespace, name) / "status"
) as response:
response.raise_for_status()
payload = await response.json()
if "status" not in payload:
return None
status_payload = payload["status"]
return status_payload
payload = await self._request(
method="get",
url=self._endpoints.platform(namespace, name) / "status",
)
if "status" not in payload:
return None
status_payload = payload["status"]
return status_payload

async def update_platform_status(
self, namespace: str, name: str, payload: dict[str, Any]
) -> None:
assert self._session
async with self._session.patch(
self._endpoints.platform(namespace, name) / "status",
await self._request(
method="patch",
url=self._endpoints.platform(namespace, name) / "status",
headers={"Content-Type": "application/merge-patch+json"},
data=json.dumps({"status": payload}),
) as response:
response.raise_for_status()
)

async def acquire_lock(
self,
Expand Down