From 637c14802d2f05aaccea1988f88ed58326d605d8 Mon Sep 17 00:00:00 2001 From: Tomas Pazderka Date: Tue, 10 Nov 2020 10:00:53 +0100 Subject: [PATCH] Enforce alg verification in id_token --- src/oic/oic/consumer.py | 2 + tests/test_oic_consumer.py | 135 +++++++++++++++++++++++++++++++++++-- 2 files changed, 133 insertions(+), 4 deletions(-) diff --git a/src/oic/oic/consumer.py b/src/oic/oic/consumer.py index bbe6c5c7..647cbe4a 100644 --- a/src/oic/oic/consumer.py +++ b/src/oic/oic/consumer.py @@ -400,6 +400,8 @@ def parse_authz( _log_info("response: %s" % sanitize(query)) + if "algs" not in kwargs: + kwargs["algs"] = self.sign_enc_algs("id_token") if "code" in self.consumer_config["response_type"]: aresp, _state = self._parse_authz(query, **kwargs) diff --git a/tests/test_oic_consumer.py b/tests/test_oic_consumer.py index 541a5cc7..1f131705 100644 --- a/tests/test_oic_consumer.py +++ b/tests/test_oic_consumer.py @@ -10,6 +10,7 @@ from jwkest.jwk import SYMKey from oic.oauth2.message import MissingSigningKey +from oic.oauth2.message import WrongSigningAlgorithm from oic.oic import DEF_SIGN_ALG from oic.oic import Server from oic.oic import response_types_to_grant_types @@ -463,7 +464,7 @@ def test_complete_auth_token_idtoken(self): _state = "state0" self.consumer.consumer_config["response_type"] = ["id_token", "token"] self.consumer.registration_response = RegistrationResponse( - id_token_signed_response_alg="RS256" + id_token_signed_response_alg="HS256" ) self.consumer.authz_req = {} # Store AuthzReq with state as key @@ -484,7 +485,7 @@ def test_complete_auth_token_idtoken(self): location = ( "https://example.com/cb?state=state0&access_token=token&token_type=bearer&" "scope=openid&id_token={}".format( - token.to_jwt(key=KC_RSA.keys(), algorithm="RS256") + token.to_jwt(key=[SYMKey(key="hemlig")], algorithm="HS256") ) ) with responses.RequestsMock() as rsps: @@ -508,10 +509,83 @@ def test_complete_auth_token_idtoken(self): parsed = urlparse(result.headers["location"]) with freeze_time("2019-08-09 11:00:00"): - part = self.consumer.parse_authz( - query=parsed.query, algs=self.consumer.sign_enc_algs("id_token") + part = self.consumer.parse_authz(query=parsed.query) + auth = part[0] + atr = part[1] + assert part[2] is None + + assert auth is None + assert isinstance(atr, AccessTokenResponse) + assert _eq( + atr.keys(), ["access_token", "id_token", "token_type", "state", "scope"] + ) + + with freeze_time("2019-08-09 11:00:00"): + self.consumer.verify_id_token( + atr["id_token"], self.consumer.authz_req[atr["state"]] ) + auth = part[0] + atr = part[1] + idt = part[2] + assert isinstance(part, tuple) + assert auth is None + assert isinstance(atr, AccessTokenResponse) + assert _eq( + atr.keys(), ["access_token", "id_token", "token_type", "state", "scope"] + ) + assert isinstance(idt, IdToken) + + def test_complete_auth_token_idtoken_no_alg_config(self): + _state = "state0" + self.consumer.consumer_config["response_type"] = ["id_token", "token"] + self.consumer.provider_info = ProviderConfigurationResponse( + issuer="https://example.com" + ) # abs min + self.consumer.authz_req = {} # Store AuthzReq with state as key + + args = { + "client_id": self.consumer.client_id, + "response_type": self.consumer.consumer_config["response_type"], + "scope": ["openid"], + "nonce": "nonce", + } + token = IdToken( + iss="https://example.com", + aud="client_1", + sub="some_sub", + exp=1565348600, + iat=1565348300, + nonce="nonce", + ) + location = ( + "https://example.com/cb?state=state0&access_token=token&token_type=bearer&" + "scope=openid&id_token={}".format( + token.to_jwt(key=[SYMKey(key="hemlig")], algorithm="HS256") + ) + ) + with responses.RequestsMock() as rsps: + rsps.add( + responses.GET, + "https://example.com/authorization", + status=302, + headers={"location": location}, + ) + result = self.consumer.do_authorization_request( + state=_state, request_args=args + ) + query = parse_qs(urlparse(result.request.url).query) + assert query["client_id"] == ["client_1"] + assert query["scope"] == ["openid"] + assert query["response_type"] == ["id_token token"] + assert query["state"] == ["state0"] + assert query["nonce"] == ["nonce"] + assert query["redirect_uri"] == ["https://example.com/cb"] + + parsed = urlparse(result.headers["location"]) + + with freeze_time("2019-08-09 11:00:00"): + part = self.consumer.parse_authz(query=parsed.query, algs={"sign": "HS256"}) auth = part[0] atr = part[1] idt = part[2] @@ -524,6 +598,59 @@ def test_complete_auth_token_idtoken(self): ) assert isinstance(idt, IdToken) + def test_complete_auth_token_idtoken_cipher_downgrade(self): + _state = "state0" + self.consumer.consumer_config["response_type"] = ["id_token", "token"] + self.consumer.provider_info = ProviderConfigurationResponse( + issuer="https://example.com" + ) # abs min + self.consumer.authz_req = {} # Store AuthzReq with state as key + + args = { + "client_id": self.consumer.client_id, + "response_type": self.consumer.consumer_config["response_type"], + "scope": ["openid"], + "nonce": "nonce", + } + token = IdToken( + iss="https://example.com", + aud="client_1", + sub="some_sub", + exp=1565348600, + iat=1565348300, + nonce="nonce", + ) + # Downgrade the algorithm to `none` + location = ( + "https://example.com/cb?state=state0&access_token=token&token_type=bearer&" + "scope=openid&id_token={}".format( + token.to_jwt(key=KC_RSA.keys(), algorithm="none") + ) + ) + with responses.RequestsMock() as rsps: + rsps.add( + responses.GET, + "https://example.com/authorization", + status=302, + headers={"location": location}, + ) + result = self.consumer.do_authorization_request( + state=_state, request_args=args + ) + query = parse_qs(urlparse(result.request.url).query) + assert query["client_id"] == ["client_1"] + assert query["scope"] == ["openid"] + assert query["response_type"] == ["id_token token"] + assert query["state"] == ["state0"] + assert query["nonce"] == ["nonce"] + assert query["redirect_uri"] == ["https://example.com/cb"] + + parsed = urlparse(result.headers["location"]) + + with freeze_time("2019-08-09 11:00:00"): + with pytest.raises(WrongSigningAlgorithm): + part = self.consumer.parse_authz(query=parsed.query) + def test_userinfo(self): _state = "state0"