diff --git a/jwt/api_jws.py b/jwt/api_jws.py index 0c61c7df..3e79d41a 100644 --- a/jwt/api_jws.py +++ b/jwt/api_jws.py @@ -5,8 +5,8 @@ from collections import Mapping from .algorithms import Algorithm, get_default_algorithms # NOQA -from .compat import text_type -from .exceptions import DecodeError, InvalidAlgorithmError +from .compat import string_types, text_type +from .exceptions import DecodeError, InvalidAlgorithmError, InvalidTokenError from .utils import base64url_decode, base64url_encode, merge_dict @@ -79,6 +79,7 @@ def encode(self, payload, key, algorithm='HS256', headers=None, header = {'typ': self.header_typ, 'alg': algorithm} if headers: + self._validate_headers(headers) header.update(headers) json_header = json.dumps( @@ -125,7 +126,10 @@ def get_unverified_header(self, jwt): Note: The signature is not verified so the header parameters should not be fully trusted until signature verification is complete """ - return self._load(jwt)[2] + headers = self._load(jwt)[2] + self._validate_headers(headers) + + return headers def _load(self, jwt): if isinstance(jwt, text_type): @@ -180,6 +184,13 @@ def _verify_signature(self, payload, signing_input, header, signature, except KeyError: raise InvalidAlgorithmError('Algorithm not supported') + def _validate_headers(self, headers): + if 'kid' in headers: + self._validate_kid(headers['kid']) + + def _validate_kid(self, kid): + if not isinstance(kid, string_types): + raise InvalidTokenError('Key ID header parameter must be a string') _jws_global_obj = PyJWS() encode = _jws_global_obj.encode diff --git a/tests/test_api_jws.py b/tests/test_api_jws.py index 9aa8b85b..2079bef0 100644 --- a/tests/test_api_jws.py +++ b/tests/test_api_jws.py @@ -6,7 +6,7 @@ from jwt.algorithms import Algorithm from jwt.api_jws import PyJWS from jwt.exceptions import ( - DecodeError, InvalidAlgorithmError + DecodeError, InvalidAlgorithmError, InvalidTokenError ) from jwt.utils import base64url_decode @@ -367,12 +367,24 @@ def test_decode_with_algo_none_and_verify_false_should_pass(self, jws, payload): def test_get_unverified_header_returns_header_values(self, jws, payload): jws_message = jws.encode(payload, key='secret', algorithm='HS256', - headers={'kid': 123}) + headers={'kid': 'toomanysecrets'}) header = jws.get_unverified_header(jws_message) assert 'kid' in header - assert header['kid'] == 123 + assert header['kid'] == 'toomanysecrets' + + def test_get_unverified_header_fails_on_bad_header_types(self, jws, payload): + # Contains a bad kid value (int 123 instead of string) + example_jws = ( + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCIsImtpZCI6MTIzfQ' + '.eyJzdWIiOiIxMjM0NTY3ODkwIn0' + '.vs2WY54jfpKP3JGC73Vq5YlMsqM5oTZ1ZydT77SiZSk') + + with pytest.raises(InvalidTokenError) as exc: + jws.get_unverified_header(example_jws) + + assert 'Key ID header parameter must be a string' == str(exc.value) @pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library') def test_encode_decode_with_rsa_sha256(self, jws, payload): @@ -597,3 +609,14 @@ def test_encode_headers_parameter_adds_headers(self, jws, payload): assert 'testheader' in header_obj assert header_obj['testheader'] == headers['testheader'] + + def test_encode_fails_on_invalid_kid_types(self, jws, payload): + with pytest.raises(InvalidTokenError) as exc: + jws.encode(payload, 'secret', headers={'kid': 123}) + + assert 'Key ID header parameter must be a string' == str(exc.value) + + with pytest.raises(InvalidTokenError) as exc: + jws.encode(payload, 'secret', headers={'kid': None}) + + assert 'Key ID header parameter must be a string' == str(exc.value)