Skip to content

Commit

Permalink
Switch over JWEs
Browse files Browse the repository at this point in the history
Instead of our own mix of base64,AES,iv... just use JWE for the same
functionality.
  • Loading branch information
marcospri committed Jan 11, 2023
1 parent a7c7b0e commit 744dbf0
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 75 deletions.
53 changes: 15 additions & 38 deletions src/h_vialib/secure/encryption.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,26 @@
import base64
import json
from typing import Tuple

from Cryptodome import Random
from Cryptodome.Cipher import AES
from Cryptodome.Util.Padding import pad
from jose import constants, jwe


class Encryption:
JWE_ALGORITHM = constants.ALGORITHMS.DIR
JWE_ENCRYPTION = constants.ALGORITHMS.A128CBC_HS256

def __init__(self, secret: bytes):
self._secret = pad(secret, 16)
self._secret = secret.ljust(32)[:32]

def encrypt_dict(self, payload: dict) -> str:
"""
Encrypt a dictionary using AES.
The returned string is a json encoded dictionary containing both the AES IV
and the cipher text.
"""
dict_json = json.dumps(payload).encode("utf-8")
aes_iv, encrypted_json = self._encrypt(dict_json)

return json.dumps(
{
"iv": base64.urlsafe_b64encode(aes_iv).decode("utf-8"),
"payload": base64.urlsafe_b64encode(encrypted_json).decode("utf-8"),
}
)

def decrypt_dict(self, payload: str) -> dict:
"""Decypts payloads created by `encrypt_dict`."""
payload_dict = json.loads(payload)
"""Encrypt a dictionary as a JWE."""

aes_iv = payload_dict.get("iv", "")
cipher = payload_dict.get("payload", "")
return jwe.encrypt(
json.dumps(payload),
self._secret,
algorithm=self.JWE_ALGORITHM,
encryption=self.JWE_ENCRYPTION,
).decode("utf-8")

aes_iv = base64.urlsafe_b64decode(aes_iv)
cipher = base64.urlsafe_b64decode(cipher)

return json.loads(self._decrypt(aes_iv, cipher))

def _decrypt(self, aes_iv, encrypted) -> bytes:
cipher = AES.new(self._secret, AES.MODE_CFB, aes_iv)
return cipher.decrypt(encrypted)
def decrypt_dict(self, payload: str) -> dict:
"""Decrypts payloads created by `encrypt_dict`."""

def _encrypt(self, plain_text: bytes) -> Tuple[bytes, bytes]:
aes_iv = Random.new().read(AES.block_size)
return (aes_iv, AES.new(self._secret, AES.MODE_CFB, aes_iv).encrypt(plain_text))
return json.loads(jwe.decrypt(payload, self._secret))
54 changes: 17 additions & 37 deletions tests/unit/h_vialib/secure/encryption_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import pytest
from Cryptodome.Random import get_random_bytes
from Cryptodome.Util.Padding import pad
from jose import constants

from h_vialib.secure import Encryption

Expand All @@ -13,58 +12,39 @@ def test_encrypt_dict_round_trip(self, encryption):

assert encryption.decrypt_dict(encrypted) == payload_dict

def test_encrypt_dict(self, encryption, secret, AES, Random, base64, json):
def test_encrypt_dict(self, encryption, secret, json, jwe):
payload_dict = {"some": "data"}

encrypted = encryption.encrypt_dict(payload_dict)

Random.new.return_value.read.assert_called_once_with(AES.block_size)
AES.new.assert_called_once_with(
pad(secret, 16), AES.MODE_CFB, Random.new.return_value.read.return_value
json.dumps.assert_called_with(payload_dict)
jwe.encrypt.assert_called_once_with(
json.dumps.return_value,
secret.ljust(32),
algorithm=constants.ALGORITHMS.DIR,
encryption=constants.ALGORITHMS.A128CBC_HS256,
)
assert encrypted == jwe.encrypt.return_value.decode.return_value

AES.new.return_value.encrypt.assert_called_once_with(
json.dumps.return_value.encode.return_value
)
json.dumps.assert_called_with(
{
"iv": base64.urlsafe_b64encode.return_value.decode.return_value,
"payload": base64.urlsafe_b64encode.return_value.decode.return_value,
}
)
assert encrypted == json.dumps.return_value

def test_decrypt_dict(self, encryption, secret, AES, base64, json):
def test_decrypt_dict(self, encryption, secret, jwe, json):
plain_text_dict = encryption.decrypt_dict("payload")

AES.new.assert_called_once_with(
pad(secret, 16), AES.MODE_CFB, base64.urlsafe_b64decode.return_value
)
AES.new.return_value.decrypt.assert_called_once_with(
base64.urlsafe_b64decode.return_value
)
jwe.decrypt.assert_called_once_with("payload", secret.ljust(32))
json.loads.assert_called_once_with(jwe.decrypt.return_value)
assert plain_text_dict == json.loads.return_value

@pytest.fixture
def secret(self):
return get_random_bytes(12)
return b"VERY SECRET"

@pytest.fixture
def encryption(self, secret):
return Encryption(secret)

@pytest.fixture
def AES(self, patch):
return patch("h_vialib.secure.encryption.AES")

@pytest.fixture
def Random(self, patch):
return patch("h_vialib.secure.encryption.Random")

@pytest.fixture
def base64(self, patch):
return patch("h_vialib.secure.encryption.base64")

@pytest.fixture
def json(self, patch):
return patch("h_vialib.secure.encryption.json")

@pytest.fixture
def jwe(self, patch):
return patch("h_vialib.secure.encryption.jwe")

0 comments on commit 744dbf0

Please sign in to comment.