From 32d5e775f3b549f54a957ca50701c0a411935908 Mon Sep 17 00:00:00 2001 From: Cristian Betivu Date: Tue, 12 Nov 2024 19:57:10 +0200 Subject: [PATCH 01/10] Consistency for token decoding --- src/keycloak/keycloak_openid.py | 62 +++++++++++++++------------------ 1 file changed, 28 insertions(+), 34 deletions(-) diff --git a/src/keycloak/keycloak_openid.py b/src/keycloak/keycloak_openid.py index ae9cd82..b1fe8ec 100644 --- a/src/keycloak/keycloak_openid.py +++ b/src/keycloak/keycloak_openid.py @@ -581,6 +581,20 @@ def introspect(self, token, rpt=None, token_type_hint=None): ) return raise_error_from_response(data_raw, KeycloakPostError) + def _verify_token(self, token, key, **kwargs): + # keep the function free of IO + # this way it can be used by `decode_token` and `a_decode_token` + if key is not None: + leeway = kwargs.pop("leeway", 60) + full_jwt = jwt.JWT(jwt=token, **kwargs) + full_jwt.leeway = leeway + full_jwt.validate(key) + return jwt.json_decode(full_jwt.claims) + else: + full_jwt = jwt.JWT(jwt=token, **kwargs) + full_jwt.token.objects["valid"] = True + return json.loads(full_jwt.token.payload.decode("utf-8")) + def decode_token(self, token, validate: bool = True, **kwargs): """Decode user token. @@ -603,26 +617,12 @@ def decode_token(self, token, validate: bool = True, **kwargs): :returns: Decoded token :rtype: dict """ - if validate: - if "key" not in kwargs: - key = ( - "-----BEGIN PUBLIC KEY-----\n" - + self.public_key() - + "\n-----END PUBLIC KEY-----" - ) - key = jwk.JWK.from_pem(key.encode("utf-8")) - kwargs["key"] = key + key = kwargs.pop("key", None) + if validate and key is None: + key = "-----BEGIN PUBLIC KEY-----\n" + self.public_key() + "\n-----END PUBLIC KEY-----" + key = jwk.JWK.from_pem(key.encode("utf-8")) - key = kwargs.pop("key") - leeway = kwargs.pop("leeway", 60) - full_jwt = jwt.JWT(jwt=token, **kwargs) - full_jwt.leeway = leeway - full_jwt.validate(key) - return jwt.json_decode(full_jwt.claims) - else: - full_jwt = jwt.JWT(jwt=token, **kwargs) - full_jwt.token.objects["valid"] = True - return json.loads(full_jwt.token.payload.decode("utf-8")) + return self._verify_token(token, key, **kwargs) def load_authorization_config(self, path): """Load Keycloak settings (authorization). @@ -1254,22 +1254,16 @@ async def a_decode_token(self, token, validate: bool = True, **kwargs): :returns: Decoded token :rtype: dict """ - if validate: - if "key" not in kwargs: - key = ( - "-----BEGIN PUBLIC KEY-----\n" - + await self.a_public_key() - + "\n-----END PUBLIC KEY-----" - ) - key = jwk.JWK.from_pem(key.encode("utf-8")) - kwargs["key"] = key + key = kwargs.pop("key", None) + if validate and key is None: + key = ( + "-----BEGIN PUBLIC KEY-----\n" + + await self.a_public_key() + + "\n-----END PUBLIC KEY-----" + ) + key = jwk.JWK.from_pem(key.encode("utf-8")) - full_jwt = jwt.JWT(jwt=token, **kwargs) - return jwt.json_decode(full_jwt.claims) - else: - full_jwt = jwt.JWT(jwt=token, **kwargs) - full_jwt.token.objects["valid"] = True - return json.loads(full_jwt.token.payload.decode("utf-8")) + return self._verify_token(token, key, **kwargs) async def a_load_authorization_config(self, path): """Load Keycloak settings (authorization) asynchronously. From 810141c348d71b649735d915438f1646a3587e88 Mon Sep 17 00:00:00 2001 From: Cristian Betivu Date: Wed, 13 Nov 2024 08:59:24 +0200 Subject: [PATCH 02/10] Mark as staticmethod --- src/keycloak/keycloak_openid.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/keycloak/keycloak_openid.py b/src/keycloak/keycloak_openid.py index b1fe8ec..cb597eb 100644 --- a/src/keycloak/keycloak_openid.py +++ b/src/keycloak/keycloak_openid.py @@ -581,7 +581,8 @@ def introspect(self, token, rpt=None, token_type_hint=None): ) return raise_error_from_response(data_raw, KeycloakPostError) - def _verify_token(self, token, key, **kwargs): + @staticmethod + def _verify_token(token, key, **kwargs): # keep the function free of IO # this way it can be used by `decode_token` and `a_decode_token` if key is not None: From dd08f3fd110d978366521b39bda0723cb95492e6 Mon Sep 17 00:00:00 2001 From: Cristian Betivu Date: Wed, 13 Nov 2024 09:04:13 +0200 Subject: [PATCH 03/10] Helper function to convert key --- src/keycloak/keycloak_openid.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/keycloak/keycloak_openid.py b/src/keycloak/keycloak_openid.py index cb597eb..e080e8a 100644 --- a/src/keycloak/keycloak_openid.py +++ b/src/keycloak/keycloak_openid.py @@ -596,6 +596,11 @@ def _verify_token(token, key, **kwargs): full_jwt.token.objects["valid"] = True return json.loads(full_jwt.token.payload.decode("utf-8")) + @staticmethod + def _public_key_to_jwk(key: str) -> jwk.JWK: + key = "-----BEGIN PUBLIC KEY-----\n" + key + "\n-----END PUBLIC KEY-----" + return jwk.JWK.from_pem(key.encode("utf-8")) + def decode_token(self, token, validate: bool = True, **kwargs): """Decode user token. @@ -620,8 +625,7 @@ def decode_token(self, token, validate: bool = True, **kwargs): """ key = kwargs.pop("key", None) if validate and key is None: - key = "-----BEGIN PUBLIC KEY-----\n" + self.public_key() + "\n-----END PUBLIC KEY-----" - key = jwk.JWK.from_pem(key.encode("utf-8")) + key = self._public_key_to_jwk(self.public_key()) return self._verify_token(token, key, **kwargs) @@ -1257,12 +1261,7 @@ async def a_decode_token(self, token, validate: bool = True, **kwargs): """ key = kwargs.pop("key", None) if validate and key is None: - key = ( - "-----BEGIN PUBLIC KEY-----\n" - + await self.a_public_key() - + "\n-----END PUBLIC KEY-----" - ) - key = jwk.JWK.from_pem(key.encode("utf-8")) + key = self._public_key_to_jwk(await self.a_public_key()) return self._verify_token(token, key, **kwargs) From 056f5cee13b567e2b43f54cc1d202a62871d788f Mon Sep 17 00:00:00 2001 From: Cristian Betivu Date: Wed, 13 Nov 2024 09:52:14 +0200 Subject: [PATCH 04/10] Refactor key handling --- src/keycloak/keycloak_openid.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/src/keycloak/keycloak_openid.py b/src/keycloak/keycloak_openid.py index e080e8a..665d21c 100644 --- a/src/keycloak/keycloak_openid.py +++ b/src/keycloak/keycloak_openid.py @@ -28,7 +28,7 @@ class to handle authentication and token manipulation. """ import json -from typing import Optional +from typing import Optional, Union from jwcrypto import jwk, jwt @@ -582,9 +582,13 @@ def introspect(self, token, rpt=None, token_type_hint=None): return raise_error_from_response(data_raw, KeycloakPostError) @staticmethod - def _verify_token(token, key, **kwargs): + def _verify_token(token, key: Union[str, jwk.JWK, jwk.JWKSet, None], **kwargs): # keep the function free of IO # this way it can be used by `decode_token` and `a_decode_token` + if isinstance(key, str): + key = "-----BEGIN PUBLIC KEY-----\n" + key + "\n-----END PUBLIC KEY-----" + key = jwk.JWK.from_pem(key.encode("utf-8")) + if key is not None: leeway = kwargs.pop("leeway", 60) full_jwt = jwt.JWT(jwt=token, **kwargs) @@ -596,11 +600,6 @@ def _verify_token(token, key, **kwargs): full_jwt.token.objects["valid"] = True return json.loads(full_jwt.token.payload.decode("utf-8")) - @staticmethod - def _public_key_to_jwk(key: str) -> jwk.JWK: - key = "-----BEGIN PUBLIC KEY-----\n" + key + "\n-----END PUBLIC KEY-----" - return jwk.JWK.from_pem(key.encode("utf-8")) - def decode_token(self, token, validate: bool = True, **kwargs): """Decode user token. @@ -625,7 +624,7 @@ def decode_token(self, token, validate: bool = True, **kwargs): """ key = kwargs.pop("key", None) if validate and key is None: - key = self._public_key_to_jwk(self.public_key()) + key = self.public_key() return self._verify_token(token, key, **kwargs) @@ -1261,7 +1260,7 @@ async def a_decode_token(self, token, validate: bool = True, **kwargs): """ key = kwargs.pop("key", None) if validate and key is None: - key = self._public_key_to_jwk(await self.a_public_key()) + key = await self.a_public_key() return self._verify_token(token, key, **kwargs) From e22a89194abf03a29f603bb3d41a83c6dd6294d2 Mon Sep 17 00:00:00 2001 From: Cristian Betivu Date: Sat, 16 Nov 2024 15:11:25 +0200 Subject: [PATCH 05/10] Add tests for validate=False --- tests/test_keycloak_openid.py | 75 ++++++++++++++++++++++++++++++++++- 1 file changed, 74 insertions(+), 1 deletion(-) diff --git a/tests/test_keycloak_openid.py b/tests/test_keycloak_openid.py index f26c9b8..8efbeaa 100644 --- a/tests/test_keycloak_openid.py +++ b/tests/test_keycloak_openid.py @@ -4,6 +4,8 @@ from typing import Tuple from unittest import mock +import jwcrypto.jwk +import jwcrypto.jws import pytest from keycloak import KeycloakAdmin, KeycloakOpenID @@ -317,6 +319,39 @@ def test_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): assert decoded_refresh_token["typ"] == "Refresh", decoded_refresh_token +def test_decode_token_validate(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): + """Test decode token. + + :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials + :type oid_with_credentials: Tuple[KeycloakOpenID, str, str] + """ + oid, username, password = oid_with_credentials + token = oid.token(username=username, password=password) + access_token = token["access_token"] + decoded_access_token = oid.decode_token(token=access_token) + + key = oid.public_key() + key = "-----BEGIN PUBLIC KEY-----\n" + key + "\n-----END PUBLIC KEY-----" + key = jwcrypto.jwk.JWK.from_pem(key.encode("utf-8")) + + invalid_access_token = access_token + "a" + with pytest.raises(jwcrypto.jws.InvalidJWSSignature): + decoded_invalid_access_token = oid.decode_token(token=invalid_access_token, validate=True) + + with pytest.raises(jwcrypto.jws.InvalidJWSSignature): + decoded_invalid_access_token = oid.decode_token( + token=invalid_access_token, validate=True, key=key + ) + + decoded_invalid_access_token = oid.decode_token(token=invalid_access_token, validate=False) + assert decoded_access_token == decoded_invalid_access_token + + decoded_invalid_access_token = oid.decode_token( + token=invalid_access_token, validate=False, key=key + ) + assert decoded_access_token == decoded_invalid_access_token + + def test_load_authorization_config(oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str]): """Test load authorization config. @@ -765,7 +800,7 @@ async def test_a_introspect(oid_with_credentials: Tuple[KeycloakOpenID, str, str @pytest.mark.asyncio async def test_a_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): - """Test decode token. + """Test decode token asynchronously. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials :type oid_with_credentials: Tuple[KeycloakOpenID, str, str] @@ -781,6 +816,44 @@ async def test_a_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, s assert decoded_refresh_token["typ"] == "Refresh", decoded_refresh_token +@pytest.mark.asyncio +async def test_a_decode_token_validate(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): + """Test decode token asynchronously. + + :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials + :type oid_with_credentials: Tuple[KeycloakOpenID, str, str] + """ + oid, username, password = oid_with_credentials + token = await oid.a_token(username=username, password=password) + access_token = token["access_token"] + decoded_access_token = await oid.a_decode_token(token=access_token) + + key = await oid.a_public_key() + key = "-----BEGIN PUBLIC KEY-----\n" + key + "\n-----END PUBLIC KEY-----" + key = jwcrypto.jwk.JWK.from_pem(key.encode("utf-8")) + + invalid_access_token = access_token + "a" + with pytest.raises(jwcrypto.jws.InvalidJWSSignature): + decoded_invalid_access_token = await oid.a_decode_token( + token=invalid_access_token, validate=True + ) + + with pytest.raises(jwcrypto.jws.InvalidJWSSignature): + decoded_invalid_access_token = await oid.a_decode_token( + token=invalid_access_token, validate=True, key=key + ) + + decoded_invalid_access_token = await oid.a_decode_token( + token=invalid_access_token, validate=False + ) + assert decoded_access_token == decoded_invalid_access_token + + decoded_invalid_access_token = await oid.a_decode_token( + token=invalid_access_token, validate=False, key=key + ) + assert decoded_access_token == decoded_invalid_access_token + + @pytest.mark.asyncio async def test_a_load_authorization_config( oid_with_credentials_authz: Tuple[KeycloakOpenID, str, str] From 1ceecb8dae82de1afb61b65e5292a0d19365ddf8 Mon Sep 17 00:00:00 2001 From: Cristian Betivu Date: Sat, 16 Nov 2024 15:17:04 +0200 Subject: [PATCH 06/10] Change test name --- tests/test_keycloak_openid.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_keycloak_openid.py b/tests/test_keycloak_openid.py index 8efbeaa..d28ba97 100644 --- a/tests/test_keycloak_openid.py +++ b/tests/test_keycloak_openid.py @@ -319,8 +319,8 @@ def test_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): assert decoded_refresh_token["typ"] == "Refresh", decoded_refresh_token -def test_decode_token_validate(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): - """Test decode token. +def test_decode_token_invalid_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): + """Test decode token with an invalid token. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials :type oid_with_credentials: Tuple[KeycloakOpenID, str, str] @@ -817,8 +817,8 @@ async def test_a_decode_token(oid_with_credentials: Tuple[KeycloakOpenID, str, s @pytest.mark.asyncio -async def test_a_decode_token_validate(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): - """Test decode token asynchronously. +async def test_a_decode_token_invalid_token(oid_with_credentials: Tuple[KeycloakOpenID, str, str]): + """Test decode token asynchronously an invalid token. :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials :type oid_with_credentials: Tuple[KeycloakOpenID, str, str] From b5ab550c41f91408120cfa93328ea95f296fb73e Mon Sep 17 00:00:00 2001 From: Cristian Betivu Date: Sat, 16 Nov 2024 15:17:14 +0200 Subject: [PATCH 07/10] Fix failing test --- src/keycloak/keycloak_openid.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/keycloak/keycloak_openid.py b/src/keycloak/keycloak_openid.py index 665d21c..923e126 100644 --- a/src/keycloak/keycloak_openid.py +++ b/src/keycloak/keycloak_openid.py @@ -623,8 +623,11 @@ def decode_token(self, token, validate: bool = True, **kwargs): :rtype: dict """ key = kwargs.pop("key", None) - if validate and key is None: - key = self.public_key() + if validate: + if key is None: + key = self.public_key() + else: + key = None return self._verify_token(token, key, **kwargs) @@ -1259,8 +1262,11 @@ async def a_decode_token(self, token, validate: bool = True, **kwargs): :rtype: dict """ key = kwargs.pop("key", None) - if validate and key is None: - key = await self.a_public_key() + if validate: + if key is None: + key = await self.a_public_key() + else: + key = None return self._verify_token(token, key, **kwargs) From ae5d746f793d04457615290105542705484b3f38 Mon Sep 17 00:00:00 2001 From: Cristian Betivu Date: Sat, 16 Nov 2024 15:25:04 +0200 Subject: [PATCH 08/10] Remove special case for str --- src/keycloak/keycloak_openid.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/src/keycloak/keycloak_openid.py b/src/keycloak/keycloak_openid.py index 923e126..23381d2 100644 --- a/src/keycloak/keycloak_openid.py +++ b/src/keycloak/keycloak_openid.py @@ -582,12 +582,9 @@ def introspect(self, token, rpt=None, token_type_hint=None): return raise_error_from_response(data_raw, KeycloakPostError) @staticmethod - def _verify_token(token, key: Union[str, jwk.JWK, jwk.JWKSet, None], **kwargs): + def _verify_token(token, key: Union[jwk.JWK, jwk.JWKSet, None], **kwargs): # keep the function free of IO # this way it can be used by `decode_token` and `a_decode_token` - if isinstance(key, str): - key = "-----BEGIN PUBLIC KEY-----\n" + key + "\n-----END PUBLIC KEY-----" - key = jwk.JWK.from_pem(key.encode("utf-8")) if key is not None: leeway = kwargs.pop("leeway", 60) @@ -625,7 +622,12 @@ def decode_token(self, token, validate: bool = True, **kwargs): key = kwargs.pop("key", None) if validate: if key is None: - key = self.public_key() + key = ( + "-----BEGIN PUBLIC KEY-----\n" + + self.public_key() + + "\n-----END PUBLIC KEY-----" + ) + key = jwk.JWK.from_pem(key.encode("utf-8")) else: key = None @@ -1264,7 +1266,12 @@ async def a_decode_token(self, token, validate: bool = True, **kwargs): key = kwargs.pop("key", None) if validate: if key is None: - key = await self.a_public_key() + key = ( + "-----BEGIN PUBLIC KEY-----\n" + + await self.a_public_key() + + "\n-----END PUBLIC KEY-----" + ) + key = jwk.JWK.from_pem(key.encode("utf-8")) else: key = None From f0df38d028e4997d1e7008c4fcf998a8a553981d Mon Sep 17 00:00:00 2001 From: Cristian Betivu Date: Sat, 16 Nov 2024 15:30:09 +0200 Subject: [PATCH 09/10] Some docstring --- src/keycloak/keycloak_openid.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/keycloak/keycloak_openid.py b/src/keycloak/keycloak_openid.py index 23381d2..1ee655c 100644 --- a/src/keycloak/keycloak_openid.py +++ b/src/keycloak/keycloak_openid.py @@ -583,6 +583,14 @@ def introspect(self, token, rpt=None, token_type_hint=None): @staticmethod def _verify_token(token, key: Union[jwk.JWK, jwk.JWKSet, None], **kwargs): + """Decode and optionally validate a token. + + :param token: The token to verify + :param key: Which key should be used for validation. + If not provided, the validation is not performed and the token is implicitly valid. + :param kwargs: Additional keyword arguments for jwcrypto's JWT object + :returns: Decoded token + """ # keep the function free of IO # this way it can be used by `decode_token` and `a_decode_token` From 6cef19621770c1db7f3f3b9ae6783357c62a5b5f Mon Sep 17 00:00:00 2001 From: Richard Nemeth Date: Sun, 17 Nov 2024 10:56:23 +0100 Subject: [PATCH 10/10] docs: missing docstrings --- src/keycloak/keycloak_openid.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/keycloak/keycloak_openid.py b/src/keycloak/keycloak_openid.py index 1ee655c..91ed776 100644 --- a/src/keycloak/keycloak_openid.py +++ b/src/keycloak/keycloak_openid.py @@ -586,9 +586,12 @@ def _verify_token(token, key: Union[jwk.JWK, jwk.JWKSet, None], **kwargs): """Decode and optionally validate a token. :param token: The token to verify + :type token: str :param key: Which key should be used for validation. If not provided, the validation is not performed and the token is implicitly valid. + :type key: Union[jwk.JWK, jwk.JWKSet, None] :param kwargs: Additional keyword arguments for jwcrypto's JWT object + :type kwargs: dict :returns: Decoded token """ # keep the function free of IO