diff --git a/jwt/__init__.py b/jwt/__init__.py index 56958480..01aed819 100644 --- a/jwt/__init__.py +++ b/jwt/__init__.py @@ -9,14 +9,11 @@ ) from .exceptions import ( DecodeError, - ExpiredSignature, ExpiredSignatureError, ImmatureSignatureError, InvalidAlgorithmError, - InvalidAudience, InvalidAudienceError, InvalidIssuedAtError, - InvalidIssuer, InvalidIssuerError, InvalidSignatureError, InvalidTokenError, @@ -54,14 +51,11 @@ "unregister_algorithm", # Exceptions "DecodeError", - "ExpiredSignature", "ExpiredSignatureError", "ImmatureSignatureError", "InvalidAlgorithmError", - "InvalidAudience", "InvalidAudienceError", "InvalidIssuedAtError", - "InvalidIssuer", "InvalidIssuerError", "InvalidSignatureError", "InvalidTokenError", diff --git a/jwt/api_jws.py b/jwt/api_jws.py index 63b004f3..26d1dfc2 100644 --- a/jwt/api_jws.py +++ b/jwt/api_jws.py @@ -1,6 +1,5 @@ import binascii import json -import warnings from collections.abc import Mapping from .algorithms import requires_cryptography # NOQA @@ -145,24 +144,13 @@ def decode( verify_signature = merged_options["verify_signature"] if verify_signature and not algorithms: - warnings.warn( - "It is strongly recommended that you pass in a " - + 'value for the "algorithms" argument when calling decode(). ' - + "This argument will be mandatory in a future version.", - DeprecationWarning, - stacklevel=2, + raise DecodeError( + 'It is required that you pass in a value for the "algorithms" argument when calling decode().' ) payload, signing_input, header, signature = self._load(jwt) - if not verify: - warnings.warn( - "The verify parameter is deprecated. " - "Please use verify_signature in options instead.", - DeprecationWarning, - stacklevel=2, - ) - elif verify_signature: + if verify_signature: self._verify_signature( payload, signing_input, header, signature, key, algorithms ) diff --git a/jwt/api_jwt.py b/jwt/api_jwt.py index 348dd747..e08d65c5 100644 --- a/jwt/api_jwt.py +++ b/jwt/api_jwt.py @@ -1,5 +1,4 @@ import json -import warnings from calendar import timegm from collections.abc import Iterable, Mapping from datetime import datetime, timedelta @@ -26,7 +25,6 @@ class PyJWT(PyJWS): header_type = "JWT" - deprecated_requires = ["require_exp", "require_iat", "require_nbf"] @staticmethod def _get_default_options(): @@ -76,29 +74,23 @@ def decode( self, jwt, # type: str key="", # type: str - verify=True, # type: bool algorithms=None, # type: List[str] options=None, # type: Dict complete=False, # type: bool **kwargs - ): - # type: (...) -> Dict[str, Any] - - if verify and not algorithms: - warnings.warn( - "It is strongly recommended that you pass in a " - + 'value for the "algorithms" argument when calling decode(). ' - + "This argument will be mandatory in a future version.", - DeprecationWarning, - stacklevel=2, - ) + ): # type: (...) -> Dict[str, Any] payload, _, _, _ = self._load(jwt) if options is None: - options = {"verify_signature": verify} + options = {"verify_signature": True} else: - options.setdefault("verify_signature", verify) + options.setdefault("verify_signature", True) + + if options["verify_signature"] and not algorithms: + raise DecodeError( + 'It is required that you pass in a value for the "algorithms" argument when calling decode().' + ) decoded = super().decode( jwt, @@ -119,7 +111,7 @@ def decode( if not isinstance(payload, dict): raise DecodeError("Invalid payload string: must be a json object") - if verify: + if options["verify_signature"]: merged_options = merge_dict(self.options, options) self._validate_claims(payload, merged_options, **kwargs) @@ -132,38 +124,11 @@ def decode( def _validate_claims( self, payload, options, audience=None, issuer=None, leeway=0, **kwargs ): - - if "verify_expiration" in kwargs: - options["verify_exp"] = kwargs.get("verify_expiration", True) - warnings.warn( - "The verify_expiration parameter is deprecated. " - "Please use verify_exp in options instead.", - DeprecationWarning, - stacklevel=3, - ) - - verify_claims = { - required - for required in self.deprecated_requires - if required in options - } - require_options = options.setdefault("require", []) - for opt in verify_claims: - opt_claim = opt.split("require_", 1)[1] - if options[opt]: - require_options.append(opt_claim) - warnings.warn( - "The {} parameter is deprecated. Please add {} to" - " the require list in options instead".format(opt, opt_claim), - DeprecationWarning, - stacklevel=3, - ) - if isinstance(leeway, timedelta): leeway = leeway.total_seconds() - if not isinstance(audience, (type(None), Iterable)): - raise TypeError("audience must be an iterable or None") + if not isinstance(audience, (bytes, str, type(None), Iterable)): + raise TypeError("audience must be a string, iterable, or None") self._validate_required_claims(payload, options) diff --git a/jwt/exceptions.py b/jwt/exceptions.py index abe088be..308899aa 100644 --- a/jwt/exceptions.py +++ b/jwt/exceptions.py @@ -64,9 +64,3 @@ class PyJWKSetError(PyJWTError): class PyJWKClientError(PyJWTError): pass - - -# Compatibility aliases (deprecated) -ExpiredSignature = ExpiredSignatureError -InvalidAudience = InvalidAudienceError -InvalidIssuer = InvalidIssuerError diff --git a/jwt/jwks_client.py b/jwt/jwks_client.py index c8f50e41..60aff02a 100644 --- a/jwt/jwks_client.py +++ b/jwt/jwks_client.py @@ -59,6 +59,8 @@ def get_signing_key(self, kid): return signing_key def get_signing_key_from_jwt(self, token): - unverified = decode_token(token, complete=True, verify=False) + unverified = decode_token( + token, complete=True, options={"verify_signature": False} + ) header = unverified.get("header") return self.get_signing_key(header.get("kid")) diff --git a/tests/keys/__init__.py b/tests/keys/__init__.py index 6027285e..2cf0b15d 100644 --- a/tests/keys/__init__.py +++ b/tests/keys/__init__.py @@ -13,7 +13,7 @@ def decode_value(val): def load_hmac_key(): - with open(os.path.join(BASE_PATH, "jwk_hmac.json")) as infile: + with open(os.path.join(BASE_PATH, "jwk_hmac.json"), "r") as infile: keyobj = json.load(infile) return base64url_decode(force_bytes(keyobj["k"])) @@ -31,24 +31,26 @@ def load_hmac_key(): if has_crypto: def load_rsa_key(): - with open(os.path.join(BASE_PATH, "jwk_rsa_key.json")) as infile: + with open(os.path.join(BASE_PATH, "jwk_rsa_key.json"), "r") as infile: return RSAAlgorithm.from_jwk(infile.read()) def load_rsa_pub_key(): - with open(os.path.join(BASE_PATH, "jwk_rsa_pub.json")) as infile: + with open(os.path.join(BASE_PATH, "jwk_rsa_pub.json"), "r") as infile: return RSAAlgorithm.from_jwk(infile.read()) def load_ec_key(): - with open(os.path.join(BASE_PATH, "jwk_ec_key.json")) as infile: + with open(os.path.join(BASE_PATH, "jwk_ec_key.json"), "r") as infile: keyobj = json.load(infile) return ec.EllipticCurvePrivateNumbers( private_value=decode_value(keyobj["d"]), - public_numbers=load_ec_pub_key().public_numbers(), + public_numbers=load_ec_pub_key_p_521().public_numbers(), ) - def load_ec_pub_key(): - with open(os.path.join(BASE_PATH, "jwk_ec_pub.json")) as infile: + def load_ec_pub_key_p_521(): + with open( + os.path.join(BASE_PATH, "jwk_ec_pub_P-521.json"), "r" + ) as infile: keyobj = json.load(infile) return ec.EllipticCurvePublicNumbers( diff --git a/tests/keys/jwk_ec_key.json b/tests/keys/jwk_ec_key.json deleted file mode 100644 index a7fa999e..00000000 --- a/tests/keys/jwk_ec_key.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "kty": "EC", - "kid": "bilbo.baggins@hobbiton.example", - "use": "sig", - "crv": "P-521", - "x": "AHKZLLOsCOzz5cY97ewNUajB957y-C-U88c3v13nmGZx6sYl_oJXu9A5RkTKqjqvjyekWF-7ytDyRXYgCF5cj0Kt", - "y": "AdymlHvOiLxXkEhayXQnNCvDX4h9htZaCJN34kfmC6pV5OhQHiraVySsUdaQkAgDPrwQrJmbnX9cwlGfP-HqHZR1", - "d": "AAhRON2r9cqXX1hg-RoI6R1tX5p2rUAYdmpHZoC1XNM56KtscrX6zbKipQrCW9CGZH3T4ubpnoTKLDYJ_fF3_rJt" -} diff --git a/tests/keys/jwk_ec_pub.json b/tests/keys/jwk_ec_pub.json deleted file mode 100644 index 5259ceb7..00000000 --- a/tests/keys/jwk_ec_pub.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "kty": "EC", - "kid": "bilbo.baggins@hobbiton.example", - "use": "sig", - "crv": "P-521", - "x": "AHKZLLOsCOzz5cY97ewNUajB957y-C-U88c3v13nmGZx6sYl_oJXu9A5RkTKqjqvjyekWF-7ytDyRXYgCF5cj0Kt", - "y": "AdymlHvOiLxXkEhayXQnNCvDX4h9htZaCJN34kfmC6pV5OhQHiraVySsUdaQkAgDPrwQrJmbnX9cwlGfP-HqHZR1" -} diff --git a/tests/keys/jwk_ec_pub_P-521.json b/tests/keys/jwk_ec_pub_P-521.json new file mode 100644 index 00000000..e624136e --- /dev/null +++ b/tests/keys/jwk_ec_pub_P-521.json @@ -0,0 +1,7 @@ +{ + "kty": "EC", + "kid": "bilbo.baggins.521@hobbiton.example", + "crv": "P-521", + "x": "AHKZLLOsCOzz5cY97ewNUajB957y-C-U88c3v13nmGZx6sYl_oJXu9A5RkTKqjqvjyekWF-7ytDyRXYgCF5cj0Kt", + "y": "AdymlHvOiLxXkEhayXQnNCvDX4h9htZaCJN34kfmC6pV5OhQHiraVySsUdaQkAgDPrwQrJmbnX9cwlGfP-HqHZR1" +} diff --git a/tests/keys/testkey_ec b/tests/keys/testkey_ec deleted file mode 100644 index fa93275f..00000000 --- a/tests/keys/testkey_ec +++ /dev/null @@ -1,7 +0,0 @@ ------BEGIN EC PRIVATE KEY----- -MIHbAgEBBEG4xN/z6gk7bPkEzs1hHOsbs+Gi2lku8YH4LkS4E1q9U9jSOjvEcFNH -m/CQjKi1rtpAb0/WL3p/wXsc26e7zmAA5KAHBgUrgQQAI6GBiQOBhgAEAVnCcDxA -J0v5OJBYFIcTReydEkEIWRvpzYMvv5l8IUOT2SFJiHdWtU45DV4is7+g6bbQanbh -28/1dBLR/kH1stAeAYWeTJ08gxo3M9Q0KinXsXm4c6G24UiGY6WHeWlOPKPa16fz -pwJ62o3XaRrCdGzX+K7TCwahWCTeizrJQAe8UwUY ------END EC PRIVATE KEY----- diff --git a/tests/keys/testkey_ec.priv b/tests/keys/testkey_ec.priv new file mode 100644 index 00000000..c7c0fb7c --- /dev/null +++ b/tests/keys/testkey_ec.priv @@ -0,0 +1,5 @@ +-----BEGIN PRIVATE KEY----- +MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQg2nninfu2jMHDwAbn +9oERUhRADS6duQaJEadybLaa0YShRANCAAQfMBxRZKUYEdy5/fLdGI2tYj6kTr50 +PZPt8jOD23rAR7dhtNpG1ojqopmH0AH5wEXadgk8nLCT4cAPK59Qp9Ek +-----END PRIVATE KEY----- diff --git a/tests/keys/testkey_ec.pub b/tests/keys/testkey_ec.pub index 7cd226c7..fe75697d 100644 --- a/tests/keys/testkey_ec.pub +++ b/tests/keys/testkey_ec.pub @@ -1,6 +1,4 @@ -----BEGIN PUBLIC KEY----- -MIGbMBAGByqGSM49AgEGBSuBBAAjA4GGAAQBWcJwPEAnS/k4kFgUhxNF7J0SQQhZ -G+nNgy+/mXwhQ5PZIUmId1a1TjkNXiKzv6DpttBqduHbz/V0EtH+QfWy0B4BhZ5M -nTyDGjcz1DQqKdexebhzobbhSIZjpYd5aU48o9rXp/OnAnrajddpGsJ0bNf4rtML -BqFYJN6LOslAB7xTBRg= +MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEHzAcUWSlGBHcuf3y3RiNrWI+pE6+ +dD2T7fIzg9t6wEe3YbTaRtaI6qKZh9AB+cBF2nYJPJywk+HADyufUKfRJA== -----END PUBLIC KEY----- diff --git a/tests/keys/testkey_ec_ssh.pub b/tests/keys/testkey_ec_ssh.pub index 4fa3a6bb..4a6428e6 100644 --- a/tests/keys/testkey_ec_ssh.pub +++ b/tests/keys/testkey_ec_ssh.pub @@ -1 +1 @@ -ecdsa-sha2-nistp521 AAAAE2VjZHNhLXNoYTItbmlzdHA1MjEAAAAIbmlzdHA1MjEAAACFBAFZwnA8QCdL+TiQWBSHE0XsnRJBCFkb6c2DL7+ZfCFDk9khSYh3VrVOOQ1eIrO/oOm20Gp24dvP9XQS0f5B9bLQHgGFnkydPIMaNzPUNCop17F5uHOhtuFIhmOlh3lpTjyj2ten86cCetqN12kawnRs1/iu0wsGoVgk3os6yUAHvFMFGA== +ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBB8wHFFkpRgR3Ln98t0Yja1iPqROvnQ9k+3yM4PbesBHt2G02kbWiOqimYfQAfnARdp2CTycsJPhwA8rn1Cn0SQ= diff --git a/tests/keys/testkey_rsa b/tests/keys/testkey_rsa.priv similarity index 100% rename from tests/keys/testkey_rsa rename to tests/keys/testkey_rsa.priv diff --git a/tests/test_algorithms.py b/tests/test_algorithms.py index 4af4e6ed..2a43f159 100644 --- a/tests/test_algorithms.py +++ b/tests/test_algorithms.py @@ -17,7 +17,7 @@ RSAPSSAlgorithm, Ed25519Algorithm, ) - from .keys import load_rsa_pub_key, load_ec_pub_key + from .keys import load_rsa_pub_key, load_ec_pub_key_p_521 has_crypto = True except ImportError: @@ -79,41 +79,41 @@ def test_hmac_should_throw_exception_if_key_is_pem_public_key(self): algo = HMACAlgorithm(HMACAlgorithm.SHA256) with pytest.raises(InvalidKeyError): - with open(key_path("testkey2_rsa.pub.pem")) as keyfile: + with open(key_path("testkey2_rsa.pub.pem"), "r") as keyfile: algo.prepare_key(keyfile.read()) def test_hmac_should_throw_exception_if_key_is_x509_certificate(self): algo = HMACAlgorithm(HMACAlgorithm.SHA256) with pytest.raises(InvalidKeyError): - with open(key_path("testkey_rsa.cer")) as keyfile: + with open(key_path("testkey_rsa.cer"), "r") as keyfile: algo.prepare_key(keyfile.read()) def test_hmac_should_throw_exception_if_key_is_ssh_public_key(self): algo = HMACAlgorithm(HMACAlgorithm.SHA256) with pytest.raises(InvalidKeyError): - with open(key_path("testkey_rsa.pub")) as keyfile: + with open(key_path("testkey_rsa.pub"), "r") as keyfile: algo.prepare_key(keyfile.read()) def test_hmac_should_throw_exception_if_key_is_x509_cert(self): algo = HMACAlgorithm(HMACAlgorithm.SHA256) with pytest.raises(InvalidKeyError): - with open(key_path("testkey2_rsa.pub.pem")) as keyfile: + with open(key_path("testkey2_rsa.pub.pem"), "r") as keyfile: algo.prepare_key(keyfile.read()) def test_hmac_should_throw_exception_if_key_is_pkcs1_pem_public(self): algo = HMACAlgorithm(HMACAlgorithm.SHA256) with pytest.raises(InvalidKeyError): - with open(key_path("testkey_pkcs1.pub.pem")) as keyfile: + with open(key_path("testkey_pkcs1.pub.pem"), "r") as keyfile: algo.prepare_key(keyfile.read()) def test_hmac_jwk_should_parse_and_verify(self): algo = HMACAlgorithm(HMACAlgorithm.SHA256) - with open(key_path("jwk_hmac.json")) as keyfile: + with open(key_path("jwk_hmac.json"), "r") as keyfile: key = algo.from_jwk(keyfile.read()) signature = algo.sign(b"Hello World!", key) @@ -128,7 +128,7 @@ def test_hmac_to_jwk_returns_correct_values(self): def test_hmac_from_jwk_should_raise_exception_if_not_hmac_key(self): algo = HMACAlgorithm(HMACAlgorithm.SHA256) - with open(key_path("jwk_rsa_pub.json")) as keyfile: + with open(key_path("jwk_rsa_pub.json"), "r") as keyfile: with pytest.raises(InvalidKeyError): algo.from_jwk(keyfile.read()) @@ -138,7 +138,7 @@ def test_hmac_from_jwk_should_raise_exception_if_not_hmac_key(self): def test_rsa_should_parse_pem_public_key(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - with open(key_path("testkey2_rsa.pub.pem")) as pem_key: + with open(key_path("testkey2_rsa.pub.pem"), "r") as pem_key: algo.prepare_key(pem_key.read()) @pytest.mark.skipif( @@ -147,7 +147,7 @@ def test_rsa_should_parse_pem_public_key(self): def test_rsa_should_accept_pem_private_key_bytes(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - with open(key_path("testkey_rsa"), "rb") as pem_key: + with open(key_path("testkey_rsa.priv"), "rb") as pem_key: algo.prepare_key(pem_key.read()) @pytest.mark.skipif( @@ -156,7 +156,7 @@ def test_rsa_should_accept_pem_private_key_bytes(self): def test_rsa_should_accept_unicode_key(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - with open(key_path("testkey_rsa")) as rsa_key: + with open(key_path("testkey_rsa.priv"), "r") as rsa_key: algo.prepare_key(force_unicode(rsa_key.read())) @pytest.mark.skipif( @@ -189,7 +189,7 @@ def test_rsa_verify_should_return_false_if_signature_invalid(self): sig += force_bytes("123") # Signature is now invalid - with open(key_path("testkey_rsa.pub")) as keyfile: + with open(key_path("testkey_rsa.pub"), "r") as keyfile: pub_key = algo.prepare_key(keyfile.read()) result = algo.verify(message, pub_key, sig) @@ -201,10 +201,10 @@ def test_rsa_verify_should_return_false_if_signature_invalid(self): def test_rsa_jwk_public_and_private_keys_should_parse_and_verify(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - with open(key_path("jwk_rsa_pub.json")) as keyfile: + with open(key_path("jwk_rsa_pub.json"), "r") as keyfile: pub_key = algo.from_jwk(keyfile.read()) - with open(key_path("jwk_rsa_key.json")) as keyfile: + with open(key_path("jwk_rsa_key.json"), "r") as keyfile: priv_key = algo.from_jwk(keyfile.read()) signature = algo.sign(force_bytes("Hello World!"), priv_key) @@ -216,7 +216,7 @@ def test_rsa_jwk_public_and_private_keys_should_parse_and_verify(self): def test_rsa_private_key_to_jwk_works_with_from_jwk(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - with open(key_path("testkey_rsa")) as rsa_key: + with open(key_path("testkey_rsa.priv"), "r") as rsa_key: orig_key = algo.prepare_key(force_unicode(rsa_key.read())) parsed_key = algo.from_jwk(algo.to_jwk(orig_key)) @@ -232,7 +232,7 @@ def test_rsa_private_key_to_jwk_works_with_from_jwk(self): def test_rsa_public_key_to_jwk_works_with_from_jwk(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - with open(key_path("testkey_rsa.pub")) as rsa_key: + with open(key_path("testkey_rsa.pub"), "r") as rsa_key: orig_key = algo.prepare_key(force_unicode(rsa_key.read())) parsed_key = algo.from_jwk(algo.to_jwk(orig_key)) @@ -244,7 +244,7 @@ def test_rsa_public_key_to_jwk_works_with_from_jwk(self): def test_rsa_jwk_private_key_with_other_primes_is_invalid(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - with open(key_path("jwk_rsa_key.json")) as keyfile: + with open(key_path("jwk_rsa_key.json"), "r") as keyfile: with pytest.raises(InvalidKeyError): keydata = json.loads(keyfile.read()) keydata["oth"] = [] @@ -257,7 +257,7 @@ def test_rsa_jwk_private_key_with_other_primes_is_invalid(self): def test_rsa_jwk_private_key_with_missing_values_is_invalid(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - with open(key_path("jwk_rsa_key.json")) as keyfile: + with open(key_path("jwk_rsa_key.json"), "r") as keyfile: with pytest.raises(InvalidKeyError): keydata = json.loads(keyfile.read()) del keydata["p"] @@ -270,7 +270,7 @@ def test_rsa_jwk_private_key_with_missing_values_is_invalid(self): def test_rsa_jwk_private_key_can_recover_prime_factors(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - with open(key_path("jwk_rsa_key.json")) as keyfile: + with open(key_path("jwk_rsa_key.json"), "r") as keyfile: keybytes = keyfile.read() control_key = algo.from_jwk(keybytes).private_numbers() @@ -294,7 +294,7 @@ def test_rsa_jwk_private_key_can_recover_prime_factors(self): def test_rsa_jwk_private_key_with_missing_required_values_is_invalid(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - with open(key_path("jwk_rsa_key.json")) as keyfile: + with open(key_path("jwk_rsa_key.json"), "r") as keyfile: with pytest.raises(InvalidKeyError): keydata = json.loads(keyfile.read()) del keydata["p"] @@ -321,7 +321,7 @@ def test_rsa_jwk_raises_exception_if_not_a_valid_key(self): def test_rsa_to_jwk_returns_correct_values_for_public_key(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - with open(key_path("testkey_rsa.pub")) as keyfile: + with open(key_path("testkey_rsa.pub"), "r") as keyfile: pub_key = algo.prepare_key(keyfile.read()) key = algo.to_jwk(pub_key) @@ -347,13 +347,13 @@ def test_rsa_to_jwk_returns_correct_values_for_public_key(self): def test_rsa_to_jwk_returns_correct_values_for_private_key(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - with open(key_path("testkey_rsa")) as keyfile: + with open(key_path("testkey_rsa.priv"), "r") as keyfile: priv_key = algo.prepare_key(keyfile.read()) key = algo.to_jwk(priv_key) expected = { - "key_ops": ["sign"], + "key_ops": [u"sign"], "kty": "RSA", "e": "AQAB", "n": ( @@ -415,7 +415,7 @@ def test_rsa_to_jwk_raises_exception_on_invalid_key(self): def test_rsa_from_jwk_raises_exception_on_invalid_key(self): algo = RSAAlgorithm(RSAAlgorithm.SHA256) - with open(key_path("jwk_hmac.json")) as keyfile: + with open(key_path("jwk_hmac.json"), "r") as keyfile: with pytest.raises(InvalidKeyError): algo.from_jwk(keyfile.read()) @@ -428,22 +428,13 @@ def test_ec_should_reject_non_string_key(self): with pytest.raises(TypeError): algo.prepare_key(None) - @pytest.mark.skipif( - not has_crypto, reason="Not supported without cryptography library" - ) - def test_ec_should_accept_unicode_key(self): - algo = ECAlgorithm(ECAlgorithm.SHA256) - - with open(key_path("testkey_ec")) as ec_key: - algo.prepare_key(force_unicode(ec_key.read())) - @pytest.mark.skipif( not has_crypto, reason="Not supported without cryptography library" ) def test_ec_should_accept_pem_private_key_bytes(self): algo = ECAlgorithm(ECAlgorithm.SHA256) - with open(key_path("testkey_ec"), "rb") as ec_key: + with open(key_path("testkey_ec.priv"), "rb") as ec_key: algo.prepare_key(ec_key.read()) @pytest.mark.skipif( @@ -452,7 +443,7 @@ def test_ec_should_accept_pem_private_key_bytes(self): def test_ec_should_accept_ssh_public_key_bytes(self): algo = ECAlgorithm(ECAlgorithm.SHA256) - with open(key_path("testkey_ec_ssh.pub")) as ec_key: + with open(key_path("testkey_ec_ssh.pub"), "r") as ec_key: algo.prepare_key(ec_key.read()) @pytest.mark.skipif( @@ -474,7 +465,7 @@ def test_ec_verify_should_return_false_if_signature_invalid(self): ) ) - with open(key_path("testkey_ec.pub")) as keyfile: + with open(key_path("testkey_ec.pub"), "r") as keyfile: pub_key = algo.prepare_key(keyfile.read()) result = algo.verify(message, pub_key, sig) @@ -490,7 +481,7 @@ def test_ec_verify_should_return_false_if_signature_wrong_length(self): sig = base64.b64decode(force_bytes("AC+m4Jf/xI3guAC6w0w3")) - with open(key_path("testkey_ec.pub")) as keyfile: + with open(key_path("testkey_ec.pub"), "r") as keyfile: pub_key = algo.prepare_key(keyfile.read()) result = algo.verify(message, pub_key, sig) @@ -504,11 +495,11 @@ def test_rsa_pss_sign_then_verify_should_return_true(self): message = force_bytes("Hello World!") - with open(key_path("testkey_rsa")) as keyfile: + with open(key_path("testkey_rsa.priv"), "r") as keyfile: priv_key = algo.prepare_key(keyfile.read()) sig = algo.sign(message, priv_key) - with open(key_path("testkey_rsa.pub")) as keyfile: + with open(key_path("testkey_rsa.pub"), "r") as keyfile: pub_key = algo.prepare_key(keyfile.read()) result = algo.verify(message, pub_key, sig) @@ -535,7 +526,7 @@ def test_rsa_pss_verify_should_return_false_if_signature_invalid(self): jwt_sig += force_bytes("123") # Signature is now invalid - with open(key_path("testkey_rsa.pub")) as keyfile: + with open(key_path("testkey_rsa.pub"), "r") as keyfile: jwt_pub_key = algo.prepare_key(keyfile.read()) result = algo.verify(jwt_message, jwt_pub_key, jwt_sig) @@ -670,7 +661,7 @@ def test_ec_verify_should_return_true_for_test_vector(self): ) algo = ECAlgorithm(ECAlgorithm.SHA512) - key = algo.prepare_key(load_ec_pub_key()) + key = algo.prepare_key(load_ec_pub_key_p_521()) result = algo.verify(signing_input, key, signature) assert result diff --git a/tests/test_api_jws.py b/tests/test_api_jws.py index 89b273c6..df9551c2 100644 --- a/tests/test_api_jws.py +++ b/tests/test_api_jws.py @@ -88,8 +88,8 @@ def test_options_must_be_dict(self, jws): def test_encode_decode(self, jws, payload): secret = "secret" - jws_message = jws.encode(payload, secret) - decoded_payload = jws.decode(jws_message, secret) + jws_message = jws.encode(payload, secret, algorithm="HS256") + decoded_payload = jws.decode(jws_message, secret, algorithms=["HS256"]) assert decoded_payload == payload @@ -98,7 +98,7 @@ def test_decode_fails_when_alg_is_not_on_method_algorithms_param( ): secret = "secret" jws_token = jws.encode(payload, secret, algorithm="HS256") - jws.decode(jws_token, secret) + jws.decode(jws_token, secret, algorithms=["HS256"]) with pytest.raises(InvalidAlgorithmError): jws.decode(jws_token, secret, algorithms=["HS384"]) @@ -111,7 +111,7 @@ def test_decode_works_with_unicode_token(self, jws): ".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8" ) - jws.decode(unicode_jws, secret) + jws.decode(unicode_jws, secret, algorithms=["HS256"]) def test_decode_missing_segments_throws_exception(self, jws): secret = "secret" @@ -122,7 +122,7 @@ def test_decode_missing_segments_throws_exception(self, jws): ) # Missing segment with pytest.raises(DecodeError) as context: - jws.decode(example_jws, secret) + jws.decode(example_jws, secret, algorithms=["HS256"]) exception = context.value assert str(exception) == "Not enough segments" @@ -132,7 +132,7 @@ def test_decode_invalid_token_type_is_none(self, jws): example_secret = "secret" with pytest.raises(DecodeError) as context: - jws.decode(example_jws, example_secret) + jws.decode(example_jws, example_secret, algorithms=["HS256"]) exception = context.value assert "Invalid token type" in str(exception) @@ -142,7 +142,7 @@ def test_decode_invalid_token_type_is_int(self, jws): example_secret = "secret" with pytest.raises(DecodeError) as context: - jws.decode(example_jws, example_secret) + jws.decode(example_jws, example_secret, algorithms=["HS256"]) exception = context.value assert "Invalid token type" in str(exception) @@ -156,7 +156,7 @@ def test_decode_with_non_mapping_header_throws_exception(self, jws): ) with pytest.raises(DecodeError) as context: - jws.decode(example_jws, secret) + jws.decode(example_jws, secret, algorithms=["HS256"]) exception = context.value assert str(exception) == "Invalid header string: must be a json object" @@ -181,7 +181,7 @@ def test_decode_algorithm_param_should_be_case_sensitive(self, jws): ) with pytest.raises(InvalidAlgorithmError) as context: - jws.decode(example_jws, "secret") + jws.decode(example_jws, "secret", algorithms=["hs256"]) exception = context.value assert str(exception) == "Algorithm not supported" @@ -193,11 +193,11 @@ def test_bad_secret(self, jws, payload): with pytest.raises(DecodeError) as excinfo: # Backward compat for ticket #315 - jws.decode(jws_message, bad_secret) + jws.decode(jws_message, bad_secret, algorithms=["HS256"]) assert "Signature verification failed" == str(excinfo.value) with pytest.raises(InvalidSignatureError) as excinfo: - jws.decode(jws_message, bad_secret) + jws.decode(jws_message, bad_secret, algorithms=["HS256"]) assert "Signature verification failed" == str(excinfo.value) def test_decodes_valid_jws(self, jws, payload): @@ -208,7 +208,9 @@ def test_decodes_valid_jws(self, jws, payload): b"gEW0pdU4kxPthjtehYdhxB9mMOGajt1xCKlGGXDJ8PM" ) - decoded_payload = jws.decode(example_jws, example_secret) + decoded_payload = jws.decode( + example_jws, example_secret, algorithms=["HS256"] + ) assert decoded_payload == payload @@ -221,17 +223,16 @@ def test_decodes_valid_jws(self, jws, payload): ) def test_decodes_valid_es384_jws(self, jws): example_payload = {"hello": "world"} - with open("tests/keys/testkey_ec.pub") as fp: + with open("tests/keys/testkey_ec.pub", "r") as fp: example_pubkey = fp.read() example_jws = ( - b"eyJhbGciOiJFUzM4NCIsInR5cCI6IkpXVCJ9" - b".eyJoZWxsbyI6IndvcmxkIn0" - b".AGtlemKghaIaYh1yeeekFH9fRuNY7hCaw5hUgZ5aG1N" - b"2F8FIbiKLaZKr8SiFdTimXFVTEmxpBQ9sRmdsDsnrM-1" - b"HAG0_zxxu0JyINOFT2iqF3URYl9HZ8kZWMeZAtXmn6Cw" - b"PXRJD2f7N-f7bJ5JeL9VT5beI2XD3FlK3GgRvI-eE-2Ik" + b"eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9." + b"eyJoZWxsbyI6IndvcmxkIn0.TORyNQab_MoXM7DvNKaTwbrJr4UY" + b"d2SsX8hhlnWelQFmPFSf_JzC2EbLnar92t-bXsDovzxp25ExazrVHkfPkQ" + ) + decoded_payload = jws.decode( + example_jws, example_pubkey, algorithms=["ES256"] ) - decoded_payload = jws.decode(example_jws, example_pubkey) json_payload = json.loads(force_unicode(decoded_payload)) assert json_payload == example_payload @@ -245,7 +246,7 @@ def test_decodes_valid_es384_jws(self, jws): ) def test_decodes_valid_rs384_jws(self, jws): example_payload = {"hello": "world"} - with open("tests/keys/testkey_rsa.pub") as fp: + with open("tests/keys/testkey_rsa.pub", "r") as fp: example_pubkey = fp.read() example_jws = ( b"eyJhbGciOiJSUzM4NCIsInR5cCI6IkpXVCJ9" @@ -259,7 +260,9 @@ def test_decodes_valid_rs384_jws(self, jws): b"uwmrtSWCBUjiN8sqJ00CDgycxKqHfUndZbEAOjcCAhBr" b"qWW3mSVivUfubsYbwUdUG3fSRPjaUPcpe8A" ) - decoded_payload = jws.decode(example_jws, example_pubkey) + decoded_payload = jws.decode( + example_jws, example_pubkey, algorithms=["RS384"] + ) json_payload = json.loads(force_unicode(decoded_payload)) assert json_payload == example_payload @@ -272,24 +275,19 @@ def test_load_verify_valid_jws(self, jws, payload): b"SIr03zM64awWRdPrAM_61QWsZchAtgDV3pphfHPPWkI" ) - decoded_payload = jws.decode(example_jws, key=example_secret) + decoded_payload = jws.decode( + example_jws, key=example_secret, algorithms=["HS256"] + ) assert decoded_payload == payload def test_allow_skip_verification(self, jws, payload): right_secret = "foo" jws_message = jws.encode(payload, right_secret) - decoded_payload = jws.decode(jws_message, verify=False) - - assert decoded_payload == payload - - def test_verify_false_deprecated(self, jws, recwarn): - example_jws = ( - b"eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9" - b".eyJoZWxsbyI6ICJ3b3JsZCJ9" - b".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8" + decoded_payload = jws.decode( + jws_message, options={"verify_signature": False} ) - pytest.deprecated_call(jws.decode, example_jws, verify=False) + assert decoded_payload == payload def test_decode_with_optional_algorithms(self, jws): example_secret = "secret" @@ -299,7 +297,13 @@ def test_decode_with_optional_algorithms(self, jws): b"SIr03zM64awWRdPrAM_61QWsZchAtgDV3pphfHPPWkI" ) - pytest.deprecated_call(jws.decode, example_jws, key=example_secret) + with pytest.raises(DecodeError) as exc: + jws.decode(example_jws, key=example_secret) + + assert ( + 'It is required that you pass in a value for the "algorithms" argument when calling decode().' + in str(exc.value) + ) def test_decode_no_algorithms_verify_signature_false(self, jws): example_secret = "secret" @@ -309,23 +313,22 @@ def test_decode_no_algorithms_verify_signature_false(self, jws): b"SIr03zM64awWRdPrAM_61QWsZchAtgDV3pphfHPPWkI" ) - try: - pytest.deprecated_call( - jws.decode, - example_jws, - key=example_secret, - options={"verify_signature": False}, - ) - except pytest.fail.Exception: - pass - else: - assert False, "Unexpected DeprecationWarning raised." + jws.decode( + example_jws, + key=example_secret, + options={"verify_signature": False}, + ) def test_load_no_verification(self, jws, payload): right_secret = "foo" jws_message = jws.encode(payload, right_secret) - decoded_payload = jws.decode(jws_message, key=None, verify=False) + decoded_payload = jws.decode( + jws_message, + key=None, + algorithms=["HS256"], + options={"verify_signature": False}, + ) assert decoded_payload == payload @@ -334,14 +337,14 @@ def test_no_secret(self, jws, payload): jws_message = jws.encode(payload, right_secret) with pytest.raises(DecodeError): - jws.decode(jws_message) + jws.decode(jws_message, algorithms=["HS256"]) def test_verify_signature_with_no_secret(self, jws, payload): right_secret = "foo" jws_message = jws.encode(payload, right_secret) with pytest.raises(DecodeError) as exc: - jws.decode(jws_message) + jws.decode(jws_message, algorithms=["HS256"]) assert "Signature verification" in str(exc.value) @@ -355,7 +358,7 @@ def test_verify_signature_with_no_algo_header_throws_exception( ) with pytest.raises(InvalidAlgorithmError): - jws.decode(example_jws, "secret") + jws.decode(example_jws, "secret", algorithms=["HS256"]) def test_invalid_crypto_alg(self, jws, payload): with pytest.raises(NotImplementedError): @@ -372,7 +375,7 @@ def test_missing_crypto_library_better_error_messages(self, jws, payload): def test_unicode_secret(self, jws, payload): secret = "\xc2" jws_message = jws.encode(payload, secret) - decoded_payload = jws.decode(jws_message, secret) + decoded_payload = jws.decode(jws_message, secret, algorithms=["HS256"]) assert decoded_payload == payload @@ -380,7 +383,7 @@ def test_nonascii_secret(self, jws, payload): secret = "\xc2" # char value that ascii codec cannot decode jws_message = jws.encode(payload, secret) - decoded_payload = jws.decode(jws_message, secret) + decoded_payload = jws.decode(jws_message, secret, algorithms=["HS256"]) assert decoded_payload == payload @@ -388,7 +391,7 @@ def test_bytes_secret(self, jws, payload): secret = b"\xc2" # char value that ascii codec cannot decode jws_message = jws.encode(payload, secret) - decoded_payload = jws.decode(jws_message, secret) + decoded_payload = jws.decode(jws_message, secret, algorithms=["HS256"]) assert decoded_payload == payload @@ -401,7 +404,7 @@ def test_decode_invalid_header_padding(self, jws): example_secret = "secret" with pytest.raises(DecodeError) as exc: - jws.decode(example_jws, example_secret) + jws.decode(example_jws, example_secret, algorithms=["HS256"]) assert "header padding" in str(exc.value) @@ -414,7 +417,7 @@ def test_decode_invalid_header_string(self, jws): example_secret = "secret" with pytest.raises(DecodeError) as exc: - jws.decode(example_jws, example_secret) + jws.decode(example_jws, example_secret, algorithms=["HS256"]) assert "Invalid header" in str(exc.value) @@ -427,7 +430,7 @@ def test_decode_invalid_payload_padding(self, jws): example_secret = "secret" with pytest.raises(DecodeError) as exc: - jws.decode(example_jws, example_secret) + jws.decode(example_jws, example_secret, algorithms=["HS256"]) assert "Invalid payload padding" in str(exc.value) @@ -440,7 +443,7 @@ def test_decode_invalid_crypto_padding(self, jws): example_secret = "secret" with pytest.raises(DecodeError) as exc: - jws.decode(example_jws, example_secret) + jws.decode(example_jws, example_secret, algorithms=["HS256"]) assert "Invalid crypto padding" in str(exc.value) @@ -448,13 +451,13 @@ def test_decode_with_algo_none_should_fail(self, jws, payload): jws_message = jws.encode(payload, key=None, algorithm=None) with pytest.raises(DecodeError): - jws.decode(jws_message) + jws.decode(jws_message, algorithms=["none"]) def test_decode_with_algo_none_and_verify_false_should_pass( self, jws, payload ): jws_message = jws.encode(payload, key=None, algorithm=None) - jws.decode(jws_message, verify=False) + jws.decode(jws_message, options={"verify_signature": False}) def test_get_unverified_header_returns_header_values(self, jws, payload): jws_message = jws.encode( @@ -489,7 +492,7 @@ def test_get_unverified_header_fails_on_bad_header_types( ) def test_encode_decode_with_rsa_sha256(self, jws, payload): # PEM-formatted RSA key - with open("tests/keys/testkey_rsa") as rsa_priv_file: + with open("tests/keys/testkey_rsa.priv", "r") as rsa_priv_file: priv_rsakey = load_pem_private_key( force_bytes(rsa_priv_file.read()), password=None, @@ -497,28 +500,28 @@ def test_encode_decode_with_rsa_sha256(self, jws, payload): ) jws_message = jws.encode(payload, priv_rsakey, algorithm="RS256") - with open("tests/keys/testkey_rsa.pub") as rsa_pub_file: + with open("tests/keys/testkey_rsa.pub", "r") as rsa_pub_file: pub_rsakey = load_ssh_public_key( force_bytes(rsa_pub_file.read()), backend=default_backend() ) - jws.decode(jws_message, pub_rsakey) + jws.decode(jws_message, pub_rsakey, algorithms=["RS256"]) # string-formatted key - with open("tests/keys/testkey_rsa") as rsa_priv_file: + with open("tests/keys/testkey_rsa.priv", "r") as rsa_priv_file: priv_rsakey = rsa_priv_file.read() jws_message = jws.encode(payload, priv_rsakey, algorithm="RS256") - with open("tests/keys/testkey_rsa.pub") as rsa_pub_file: + with open("tests/keys/testkey_rsa.pub", "r") as rsa_pub_file: pub_rsakey = rsa_pub_file.read() - jws.decode(jws_message, pub_rsakey) + jws.decode(jws_message, pub_rsakey, algorithms=["RS256"]) @pytest.mark.skipif( not has_crypto, reason="Not supported without cryptography library" ) def test_encode_decode_with_rsa_sha384(self, jws, payload): # PEM-formatted RSA key - with open("tests/keys/testkey_rsa") as rsa_priv_file: + with open("tests/keys/testkey_rsa.priv", "r") as rsa_priv_file: priv_rsakey = load_pem_private_key( force_bytes(rsa_priv_file.read()), password=None, @@ -526,27 +529,27 @@ def test_encode_decode_with_rsa_sha384(self, jws, payload): ) jws_message = jws.encode(payload, priv_rsakey, algorithm="RS384") - with open("tests/keys/testkey_rsa.pub") as rsa_pub_file: + with open("tests/keys/testkey_rsa.pub", "r") as rsa_pub_file: pub_rsakey = load_ssh_public_key( force_bytes(rsa_pub_file.read()), backend=default_backend() ) - jws.decode(jws_message, pub_rsakey) + jws.decode(jws_message, pub_rsakey, algorithms=["RS384"]) # string-formatted key - with open("tests/keys/testkey_rsa") as rsa_priv_file: + with open("tests/keys/testkey_rsa.priv", "r") as rsa_priv_file: priv_rsakey = rsa_priv_file.read() jws_message = jws.encode(payload, priv_rsakey, algorithm="RS384") - with open("tests/keys/testkey_rsa.pub") as rsa_pub_file: + with open("tests/keys/testkey_rsa.pub", "r") as rsa_pub_file: pub_rsakey = rsa_pub_file.read() - jws.decode(jws_message, pub_rsakey) + jws.decode(jws_message, pub_rsakey, algorithms=["RS384"]) @pytest.mark.skipif( not has_crypto, reason="Not supported without cryptography library" ) def test_encode_decode_with_rsa_sha512(self, jws, payload): # PEM-formatted RSA key - with open("tests/keys/testkey_rsa") as rsa_priv_file: + with open("tests/keys/testkey_rsa.priv", "r") as rsa_priv_file: priv_rsakey = load_pem_private_key( force_bytes(rsa_priv_file.read()), password=None, @@ -554,20 +557,20 @@ def test_encode_decode_with_rsa_sha512(self, jws, payload): ) jws_message = jws.encode(payload, priv_rsakey, algorithm="RS512") - with open("tests/keys/testkey_rsa.pub") as rsa_pub_file: + with open("tests/keys/testkey_rsa.pub", "r") as rsa_pub_file: pub_rsakey = load_ssh_public_key( force_bytes(rsa_pub_file.read()), backend=default_backend() ) - jws.decode(jws_message, pub_rsakey) + jws.decode(jws_message, pub_rsakey, algorithms=["RS512"]) # string-formatted key - with open("tests/keys/testkey_rsa") as rsa_priv_file: + with open("tests/keys/testkey_rsa.priv", "r") as rsa_priv_file: priv_rsakey = rsa_priv_file.read() jws_message = jws.encode(payload, priv_rsakey, algorithm="RS512") - with open("tests/keys/testkey_rsa.pub") as rsa_pub_file: + with open("tests/keys/testkey_rsa.pub", "r") as rsa_pub_file: pub_rsakey = rsa_pub_file.read() - jws.decode(jws_message, pub_rsakey) + jws.decode(jws_message, pub_rsakey, algorithms=["RS512"]) def test_rsa_related_algorithms(self, jws): jws = PyJWS() @@ -594,7 +597,7 @@ def test_rsa_related_algorithms(self, jws): ) def test_encode_decode_with_ecdsa_sha256(self, jws, payload): # PEM-formatted EC key - with open("tests/keys/testkey_ec") as ec_priv_file: + with open("tests/keys/testkey_ec.priv", "r") as ec_priv_file: priv_eckey = load_pem_private_key( force_bytes(ec_priv_file.read()), password=None, @@ -602,20 +605,20 @@ def test_encode_decode_with_ecdsa_sha256(self, jws, payload): ) jws_message = jws.encode(payload, priv_eckey, algorithm="ES256") - with open("tests/keys/testkey_ec.pub") as ec_pub_file: + with open("tests/keys/testkey_ec.pub", "r") as ec_pub_file: pub_eckey = load_pem_public_key( force_bytes(ec_pub_file.read()), backend=default_backend() ) - jws.decode(jws_message, pub_eckey) + jws.decode(jws_message, pub_eckey, algorithms=["ES256"]) # string-formatted key - with open("tests/keys/testkey_ec") as ec_priv_file: + with open("tests/keys/testkey_ec.priv", "r") as ec_priv_file: priv_eckey = ec_priv_file.read() jws_message = jws.encode(payload, priv_eckey, algorithm="ES256") - with open("tests/keys/testkey_ec.pub") as ec_pub_file: + with open("tests/keys/testkey_ec.pub", "r") as ec_pub_file: pub_eckey = ec_pub_file.read() - jws.decode(jws_message, pub_eckey) + jws.decode(jws_message, pub_eckey, algorithms=["ES256"]) @pytest.mark.skipif( not has_crypto, reason="Can't run without cryptography library" @@ -623,7 +626,7 @@ def test_encode_decode_with_ecdsa_sha256(self, jws, payload): def test_encode_decode_with_ecdsa_sha384(self, jws, payload): # PEM-formatted EC key - with open("tests/keys/testkey_ec") as ec_priv_file: + with open("tests/keys/testkey_ec.priv", "r") as ec_priv_file: priv_eckey = load_pem_private_key( force_bytes(ec_priv_file.read()), password=None, @@ -631,48 +634,48 @@ def test_encode_decode_with_ecdsa_sha384(self, jws, payload): ) jws_message = jws.encode(payload, priv_eckey, algorithm="ES384") - with open("tests/keys/testkey_ec.pub") as ec_pub_file: + with open("tests/keys/testkey_ec.pub", "r") as ec_pub_file: pub_eckey = load_pem_public_key( force_bytes(ec_pub_file.read()), backend=default_backend() ) - jws.decode(jws_message, pub_eckey) + jws.decode(jws_message, pub_eckey, algorithms=["ES384"]) # string-formatted key - with open("tests/keys/testkey_ec") as ec_priv_file: + with open("tests/keys/testkey_ec.priv", "r") as ec_priv_file: priv_eckey = ec_priv_file.read() jws_message = jws.encode(payload, priv_eckey, algorithm="ES384") - with open("tests/keys/testkey_ec.pub") as ec_pub_file: + with open("tests/keys/testkey_ec.pub", "r") as ec_pub_file: pub_eckey = ec_pub_file.read() - jws.decode(jws_message, pub_eckey) + jws.decode(jws_message, pub_eckey, algorithms=["ES384"]) @pytest.mark.skipif( not has_crypto, reason="Can't run without cryptography library" ) def test_encode_decode_with_ecdsa_sha512(self, jws, payload): # PEM-formatted EC key - with open("tests/keys/testkey_ec") as ec_priv_file: + with open("tests/keys/testkey_ec.priv", "r") as ec_priv_file: priv_eckey = load_pem_private_key( force_bytes(ec_priv_file.read()), password=None, backend=default_backend(), ) - jws_message = jws.encode(payload, priv_eckey, algorithm="ES521") + jws_message = jws.encode(payload, priv_eckey, algorithm="ES512") - with open("tests/keys/testkey_ec.pub") as ec_pub_file: + with open("tests/keys/testkey_ec.pub", "r") as ec_pub_file: pub_eckey = load_pem_public_key( force_bytes(ec_pub_file.read()), backend=default_backend() ) - jws.decode(jws_message, pub_eckey) + jws.decode(jws_message, pub_eckey, algorithms=["ES512"]) # string-formatted key - with open("tests/keys/testkey_ec") as ec_priv_file: + with open("tests/keys/testkey_ec.priv", "r") as ec_priv_file: priv_eckey = ec_priv_file.read() - jws_message = jws.encode(payload, priv_eckey, algorithm="ES521") + jws_message = jws.encode(payload, priv_eckey, algorithm="ES512") - with open("tests/keys/testkey_ec.pub") as ec_pub_file: + with open("tests/keys/testkey_ec.pub", "r") as ec_pub_file: pub_eckey = ec_pub_file.read() - jws.decode(jws_message, pub_eckey) + jws.decode(jws_message, pub_eckey, algorithms=["ES512"]) def test_ecdsa_related_algorithms(self, jws): jws = PyJWS() @@ -681,11 +684,11 @@ def test_ecdsa_related_algorithms(self, jws): if has_crypto: assert "ES256" in jws_algorithms assert "ES384" in jws_algorithms - assert "ES521" in jws_algorithms + assert "ES512" in jws_algorithms else: assert "ES256" not in jws_algorithms assert "ES384" not in jws_algorithms - assert "ES521" not in jws_algorithms + assert "ES512" not in jws_algorithms def test_skip_check_signature(self, jws): token = ( @@ -709,7 +712,7 @@ class CustomJSONEncoder(json.JSONEncoder): def default(self, o): if isinstance(o, Decimal): return "it worked" - return super().default(o) + return super(CustomJSONEncoder, self).default(o) data = {"some_decimal": Decimal("2.2")} diff --git a/tests/test_api_jwt.py b/tests/test_api_jwt.py index 50c748b5..8fdb80c8 100644 --- a/tests/test_api_jwt.py +++ b/tests/test_api_jwt.py @@ -41,7 +41,9 @@ def test_decodes_valid_jwt(self, jwt): b".eyJoZWxsbyI6ICJ3b3JsZCJ9" b".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8" ) - decoded_payload = jwt.decode(example_jwt, example_secret) + decoded_payload = jwt.decode( + example_jwt, example_secret, algorithms=["HS256"] + ) assert decoded_payload == example_payload @@ -54,7 +56,9 @@ def test_load_verify_valid_jwt(self, jwt): b".tvagLDLoaiJKxOKqpBXSEGy7SYSifZhjntgm9ctpyj8" ) - decoded_payload = jwt.decode(example_jwt, key=example_secret) + decoded_payload = jwt.decode( + example_jwt, key=example_secret, algorithms=["HS256"] + ) assert decoded_payload == example_payload @@ -67,7 +71,7 @@ def test_decode_invalid_payload_string(self, jwt): example_secret = "secret" with pytest.raises(DecodeError) as exc: - jwt.decode(example_jwt, example_secret) + jwt.decode(example_jwt, example_secret, algorithms=["HS256"]) assert "Invalid payload string" in str(exc.value) @@ -80,7 +84,7 @@ def test_decode_with_non_mapping_payload_throws_exception(self, jwt): ) with pytest.raises(DecodeError) as context: - jwt.decode(example_jwt, secret) + jwt.decode(example_jwt, secret, algorithms=["HS256"]) exception = context.value assert ( @@ -96,10 +100,10 @@ def test_decode_with_invalid_audience_param_throws_exception(self, jwt): ) with pytest.raises(TypeError) as context: - jwt.decode(example_jwt, secret, audience=1) + jwt.decode(example_jwt, secret, audience=1, algorithms=["HS256"]) exception = context.value - assert str(exception) == "audience must be an iterable or None" + assert str(exception) == "audience must be a string, iterable, or None" def test_decode_with_nonlist_aud_claim_throws_exception(self, jwt): secret = "secret" @@ -110,7 +114,12 @@ def test_decode_with_nonlist_aud_claim_throws_exception(self, jwt): ) with pytest.raises(InvalidAudienceError) as context: - jwt.decode(example_jwt, secret, audience="my_audience") + jwt.decode( + example_jwt, + secret, + audience="my_audience", + algorithms=["HS256"], + ) exception = context.value assert str(exception) == "Invalid claim format in token" @@ -124,7 +133,12 @@ def test_decode_with_invalid_aud_list_member_throws_exception(self, jwt): ) with pytest.raises(InvalidAudienceError) as context: - jwt.decode(example_jwt, secret, audience="my_audience") + jwt.decode( + example_jwt, + secret, + audience="my_audience", + algorithms=["HS256"], + ) exception = context.value assert str(exception) == "Invalid claim format in token" @@ -134,7 +148,10 @@ def test_encode_bad_type(self, jwt): types = ["string", tuple(), list(), 42, set()] for t in types: - pytest.raises(TypeError, lambda: jwt.encode(t, "secret")) + pytest.raises( + TypeError, + lambda: jwt.encode(t, "secret", algorithms=["HS256"]), + ) def test_decode_raises_exception_if_exp_is_not_int(self, jwt): # >>> jwt.encode({'exp': 'not-an-int'}, 'secret') @@ -145,7 +162,7 @@ def test_decode_raises_exception_if_exp_is_not_int(self, jwt): ) with pytest.raises(DecodeError) as exc: - jwt.decode(example_jwt, "secret") + jwt.decode(example_jwt, "secret", algorithms=["HS256"]) assert "exp" in str(exc.value) @@ -158,7 +175,7 @@ def test_decode_raises_exception_if_iat_is_not_int(self, jwt): ) with pytest.raises(InvalidIssuedAtError): - jwt.decode(example_jwt, "secret") + jwt.decode(example_jwt, "secret", algorithms=["HS256"]) def test_decode_raises_exception_if_nbf_is_not_int(self, jwt): # >>> jwt.encode({'nbf': 'not-an-int'}, 'secret') @@ -169,7 +186,7 @@ def test_decode_raises_exception_if_nbf_is_not_int(self, jwt): ) with pytest.raises(DecodeError): - jwt.decode(example_jwt, "secret") + jwt.decode(example_jwt, "secret", algorithms=["HS256"]) def test_encode_datetime(self, jwt): secret = "secret" @@ -180,7 +197,9 @@ def test_encode_datetime(self, jwt): "nbf": current_datetime, } jwt_message = jwt.encode(payload, secret) - decoded_payload = jwt.decode(jwt_message, secret, leeway=1) + decoded_payload = jwt.decode( + jwt_message, secret, leeway=1, algorithms=["HS256"] + ) assert decoded_payload["exp"] == timegm( current_datetime.utctimetuple() @@ -199,20 +218,19 @@ def test_encode_datetime(self, jwt): @pytest.mark.skipif( not has_crypto, reason="Can't run without cryptography library" ) - def test_decodes_valid_es384_jwt(self, jwt): + def test_decodes_valid_es256_jwt(self, jwt): example_payload = {"hello": "world"} - with open("tests/keys/testkey_ec.pub") as fp: + with open("tests/keys/testkey_ec.pub", "r") as fp: example_pubkey = fp.read() example_jwt = ( - b"eyJhbGciOiJFUzM4NCIsInR5cCI6IkpXVCJ9" - b".eyJoZWxsbyI6IndvcmxkIn0" - b".AddMgkmRhzqptDYqlmy_f2dzM6O9YZmVo-txs_CeAJD" - b"NoD8LN7YiPeLmtIhkO5_VZeHHKvtQcGc4lsq-Y72c4dK" - b"pANr1f6HEYhjpBc03u_bv06PYMcr5N2-9k97-qf-JCSb" - b"zqW6R250Q7gNCX5R7NrCl7MTM4DTBZkGbUlqsFUleiGlj" + b"eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9." + b"eyJoZWxsbyI6IndvcmxkIn0.TORyNQab_MoXM7DvNKaTwbrJr4UY" + b"d2SsX8hhlnWelQFmPFSf_JzC2EbLnar92t-bXsDovzxp25ExazrVHkfPkQ" ) - decoded_payload = jwt.decode(example_jwt, example_pubkey) + decoded_payload = jwt.decode( + example_jwt, example_pubkey, algorithms=["ES256"] + ) assert decoded_payload == example_payload # 'Control' RSA JWT created by another library. @@ -224,7 +242,7 @@ def test_decodes_valid_es384_jwt(self, jwt): ) def test_decodes_valid_rs384_jwt(self, jwt): example_payload = {"hello": "world"} - with open("tests/keys/testkey_rsa.pub") as fp: + with open("tests/keys/testkey_rsa.pub", "r") as fp: example_pubkey = fp.read() example_jwt = ( b"eyJhbGciOiJSUzM4NCIsInR5cCI6IkpXVCJ9" @@ -238,7 +256,9 @@ def test_decodes_valid_rs384_jwt(self, jwt): b"uwmrtSWCBUjiN8sqJ00CDgycxKqHfUndZbEAOjcCAhBr" b"qWW3mSVivUfubsYbwUdUG3fSRPjaUPcpe8A" ) - decoded_payload = jwt.decode(example_jwt, example_pubkey) + decoded_payload = jwt.decode( + example_jwt, example_pubkey, algorithms=["RS384"] + ) assert decoded_payload == example_payload @@ -248,7 +268,7 @@ def test_decode_with_expiration(self, jwt, payload): jwt_message = jwt.encode(payload, secret) with pytest.raises(ExpiredSignatureError): - jwt.decode(jwt_message, secret) + jwt.decode(jwt_message, secret, algorithms=["HS256"]) def test_decode_with_notbefore(self, jwt, payload): payload["nbf"] = utc_timestamp() + 10 @@ -256,21 +276,31 @@ def test_decode_with_notbefore(self, jwt, payload): jwt_message = jwt.encode(payload, secret) with pytest.raises(ImmatureSignatureError): - jwt.decode(jwt_message, secret) + jwt.decode(jwt_message, secret, algorithms=["HS256"]) def test_decode_skip_expiration_verification(self, jwt, payload): payload["exp"] = time.time() - 1 secret = "secret" jwt_message = jwt.encode(payload, secret) - jwt.decode(jwt_message, secret, options={"verify_exp": False}) + jwt.decode( + jwt_message, + secret, + algorithms=["HS256"], + options={"verify_exp": False}, + ) def test_decode_skip_notbefore_verification(self, jwt, payload): payload["nbf"] = time.time() + 10 secret = "secret" jwt_message = jwt.encode(payload, secret) - jwt.decode(jwt_message, secret, options={"verify_nbf": False}) + jwt.decode( + jwt_message, + secret, + algorithms=["HS256"], + options={"verify_nbf": False}, + ) def test_decode_with_expiration_with_leeway(self, jwt, payload): payload["exp"] = utc_timestamp() - 2 @@ -281,12 +311,16 @@ def test_decode_with_expiration_with_leeway(self, jwt, payload): # With 3 seconds leeway, should be ok for leeway in (3, timedelta(seconds=3)): - jwt.decode(jwt_message, secret, leeway=leeway) + jwt.decode( + jwt_message, secret, leeway=leeway, algorithms=["HS256"] + ) # With 1 seconds, should fail for leeway in (1, timedelta(seconds=1)): with pytest.raises(ExpiredSignatureError): - jwt.decode(jwt_message, secret, leeway=leeway) + jwt.decode( + jwt_message, secret, leeway=leeway, algorithms=["HS256"] + ) def test_decode_with_notbefore_with_leeway(self, jwt, payload): payload["nbf"] = utc_timestamp() + 10 @@ -294,37 +328,47 @@ def test_decode_with_notbefore_with_leeway(self, jwt, payload): jwt_message = jwt.encode(payload, secret) # With 13 seconds leeway, should be ok - jwt.decode(jwt_message, secret, leeway=13) + jwt.decode(jwt_message, secret, leeway=13, algorithms=["HS256"]) with pytest.raises(ImmatureSignatureError): - jwt.decode(jwt_message, secret, leeway=1) + jwt.decode(jwt_message, secret, leeway=1, algorithms=["HS256"]) def test_check_audience_when_valid(self, jwt): payload = {"some": "payload", "aud": "urn:me"} token = jwt.encode(payload, "secret") - jwt.decode(token, "secret", audience="urn:me") + jwt.decode(token, "secret", audience="urn:me", algorithms=["HS256"]) def test_check_audience_list_when_valid(self, jwt): payload = {"some": "payload", "aud": "urn:me"} token = jwt.encode(payload, "secret") - jwt.decode(token, "secret", audience=["urn:you", "urn:me"]) + jwt.decode( + token, + "secret", + audience=["urn:you", "urn:me"], + algorithms=["HS256"], + ) def test_check_audience_none_specified(self, jwt): payload = {"some": "payload", "aud": "urn:me"} token = jwt.encode(payload, "secret") with pytest.raises(InvalidAudienceError): - jwt.decode(token, "secret") + jwt.decode(token, "secret", algorithms=["HS256"]) def test_raise_exception_invalid_audience_list(self, jwt): payload = {"some": "payload", "aud": "urn:me"} token = jwt.encode(payload, "secret") with pytest.raises(InvalidAudienceError): - jwt.decode(token, "secret", audience=["urn:you", "urn:him"]) + jwt.decode( + token, + "secret", + audience=["urn:you", "urn:him"], + algorithms=["HS256"], + ) def test_check_audience_in_array_when_valid(self, jwt): payload = {"some": "payload", "aud": ["urn:me", "urn:someone-else"]} token = jwt.encode(payload, "secret") - jwt.decode(token, "secret", audience="urn:me") + jwt.decode(token, "secret", audience="urn:me", algorithms=["HS256"]) def test_raise_exception_invalid_audience(self, jwt): payload = {"some": "payload", "aud": "urn:someone-else"} @@ -332,7 +376,9 @@ def test_raise_exception_invalid_audience(self, jwt): token = jwt.encode(payload, "secret") with pytest.raises(InvalidAudienceError): - jwt.decode(token, "secret", audience="urn-me") + jwt.decode( + token, "secret", audience="urn-me", algorithms=["HS256"] + ) def test_raise_exception_invalid_audience_in_array(self, jwt): payload = { @@ -343,7 +389,9 @@ def test_raise_exception_invalid_audience_in_array(self, jwt): token = jwt.encode(payload, "secret") with pytest.raises(InvalidAudienceError): - jwt.decode(token, "secret", audience="urn:me") + jwt.decode( + token, "secret", audience="urn:me", algorithms=["HS256"] + ) def test_raise_exception_token_without_issuer(self, jwt): issuer = "urn:wrong" @@ -353,7 +401,7 @@ def test_raise_exception_token_without_issuer(self, jwt): token = jwt.encode(payload, "secret") with pytest.raises(MissingRequiredClaimError) as exc: - jwt.decode(token, "secret", issuer=issuer) + jwt.decode(token, "secret", issuer=issuer, algorithms=["HS256"]) assert exc.value.claim == "iss" @@ -362,7 +410,9 @@ def test_raise_exception_token_without_audience(self, jwt): token = jwt.encode(payload, "secret") with pytest.raises(MissingRequiredClaimError) as exc: - jwt.decode(token, "secret", audience="urn:me") + jwt.decode( + token, "secret", audience="urn:me", algorithms=["HS256"] + ) assert exc.value.claim == "aud" @@ -370,7 +420,7 @@ def test_check_issuer_when_valid(self, jwt): issuer = "urn:foo" payload = {"some": "payload", "iss": "urn:foo"} token = jwt.encode(payload, "secret") - jwt.decode(token, "secret", issuer=issuer) + jwt.decode(token, "secret", issuer=issuer, algorithms=["HS256"]) def test_raise_exception_invalid_issuer(self, jwt): issuer = "urn:wrong" @@ -380,12 +430,17 @@ def test_raise_exception_invalid_issuer(self, jwt): token = jwt.encode(payload, "secret") with pytest.raises(InvalidIssuerError): - jwt.decode(token, "secret", issuer=issuer) + jwt.decode(token, "secret", issuer=issuer, algorithms=["HS256"]) def test_skip_check_audience(self, jwt): payload = {"some": "payload", "aud": "urn:me"} token = jwt.encode(payload, "secret") - jwt.decode(token, "secret", options={"verify_aud": False}) + jwt.decode( + token, + "secret", + options={"verify_aud": False}, + algorithms=["HS256"], + ) def test_skip_check_exp(self, jwt): payload = { @@ -393,7 +448,12 @@ def test_skip_check_exp(self, jwt): "exp": datetime.utcnow() - timedelta(days=1), } token = jwt.encode(payload, "secret") - jwt.decode(token, "secret", options={"verify_exp": False}) + jwt.decode( + token, + "secret", + options={"verify_exp": False}, + algorithms=["HS256"], + ) def test_decode_should_raise_error_if_exp_required_but_not_present( self, jwt @@ -405,7 +465,12 @@ def test_decode_should_raise_error_if_exp_required_but_not_present( token = jwt.encode(payload, "secret") with pytest.raises(MissingRequiredClaimError) as exc: - jwt.decode(token, "secret", options={"require_exp": True}) + jwt.decode( + token, + "secret", + options={"require": ["exp"]}, + algorithms=["HS256"], + ) assert exc.value.claim == "exp" @@ -419,7 +484,12 @@ def test_decode_should_raise_error_if_iat_required_but_not_present( token = jwt.encode(payload, "secret") with pytest.raises(MissingRequiredClaimError) as exc: - jwt.decode(token, "secret", options={"require_iat": True}) + jwt.decode( + token, + "secret", + options={"require": ["iat"]}, + algorithms=["HS256"], + ) assert exc.value.claim == "iat" @@ -433,32 +503,27 @@ def test_decode_should_raise_error_if_nbf_required_but_not_present( token = jwt.encode(payload, "secret") with pytest.raises(MissingRequiredClaimError) as exc: - jwt.decode(token, "secret", options={"require_nbf": True}) + jwt.decode( + token, + "secret", + options={"require": ["nbf"]}, + algorithms=["HS256"], + ) assert exc.value.claim == "nbf" - def test_decode_should_raise_error_if_claim_required_but_not_present( - self, jwt - ): - claim = "sub" - payload = { - "some": "payload", - # claim not present - } - token = jwt.encode(payload, "secret") - - with pytest.raises(MissingRequiredClaimError) as exc: - jwt.decode(token, "secret", options={"require": [claim]}) - - assert exc.value.claim == claim - def test_skip_check_signature(self, jwt): token = ( "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9" ".eyJzb21lIjoicGF5bG9hZCJ9" ".4twFt5NiznN84AWoo1d7KO1T_yoc0Z6XOpOVswacPZA" ) - jwt.decode(token, "secret", options={"verify_signature": False}) + jwt.decode( + token, + "secret", + options={"verify_signature": False}, + algorithms=["HS256"], + ) def test_skip_check_iat(self, jwt): payload = { @@ -466,7 +531,12 @@ def test_skip_check_iat(self, jwt): "iat": datetime.utcnow() + timedelta(days=1), } token = jwt.encode(payload, "secret") - jwt.decode(token, "secret", options={"verify_iat": False}) + jwt.decode( + token, + "secret", + options={"verify_iat": False}, + algorithms=["HS256"], + ) def test_skip_check_nbf(self, jwt): payload = { @@ -474,54 +544,64 @@ def test_skip_check_nbf(self, jwt): "nbf": datetime.utcnow() + timedelta(days=1), } token = jwt.encode(payload, "secret") - jwt.decode(token, "secret", options={"verify_nbf": False}) + jwt.decode( + token, + "secret", + options={"verify_nbf": False}, + algorithms=["HS256"], + ) def test_custom_json_encoder(self, jwt): class CustomJSONEncoder(json.JSONEncoder): def default(self, o): if isinstance(o, Decimal): return "it worked" - return super().default(o) + return super(CustomJSONEncoder, self).default(o) data = {"some_decimal": Decimal("2.2")} with pytest.raises(TypeError): - jwt.encode(data, "secret") + jwt.encode(data, "secret", algorithms=["HS256"]) token = jwt.encode(data, "secret", json_encoder=CustomJSONEncoder) - payload = jwt.decode(token, "secret") + payload = jwt.decode(token, "secret", algorithms=["HS256"]) assert payload == {"some_decimal": "it worked"} - def test_decode_with_verify_expiration_kwarg(self, jwt, payload): + def test_decode_with_verify_exp_option(self, jwt, payload): payload["exp"] = utc_timestamp() - 1 secret = "secret" jwt_message = jwt.encode(payload, secret) - pytest.deprecated_call( - jwt.decode, jwt_message, secret, verify_expiration=False + jwt.decode( + jwt_message, + secret, + algorithms=["HS256"], + options={"verify_exp": False}, ) with pytest.raises(ExpiredSignatureError): - pytest.deprecated_call( - jwt.decode, jwt_message, secret, verify_expiration=True + jwt.decode( + jwt_message, + secret, + algorithms=["HS256"], + options={"verify_exp": True}, ) def test_decode_with_optional_algorithms(self, jwt, payload): secret = "secret" jwt_message = jwt.encode(payload, secret) - pytest.deprecated_call(jwt.decode, jwt_message, secret) + with pytest.raises(DecodeError) as exc: + jwt.decode(jwt_message, secret) + + assert ( + 'It is required that you pass in a value for the "algorithms" argument when calling decode().' + in str(exc.value) + ) - def test_decode_no_algorithms_verify_false(self, jwt, payload): + def test_decode_no_algorithms_verify_signature_false(self, jwt, payload): secret = "secret" jwt_message = jwt.encode(payload, secret) - try: - pytest.deprecated_call( - jwt.decode, jwt_message, secret, verify=False - ) - except pytest.fail.Exception: - pass - else: - assert False, "Unexpected DeprecationWarning raised." + jwt.decode(jwt_message, secret, options={"verify_signature": False}) diff --git a/tests/test_jwt.py b/tests/test_jwt.py index db96f46a..126fc9b7 100644 --- a/tests/test_jwt.py +++ b/tests/test_jwt.py @@ -13,7 +13,7 @@ def test_encode_decode(): payload = {"iss": "jeff", "exp": utc_timestamp() + 15, "claim": "insanity"} secret = "secret" - jwt_message = jwt.encode(payload, secret) - decoded_payload = jwt.decode(jwt_message, secret) + jwt_message = jwt.encode(payload, secret, algorithm="HS256") + decoded_payload = jwt.decode(jwt_message, secret, algorithms=["HS256"]) assert decoded_payload == payload