From c5f3c0caf619612c9de24242aefe65d25e90490a Mon Sep 17 00:00:00 2001 From: Erik Date: Wed, 22 Jan 2025 15:17:45 +0100 Subject: [PATCH] Add encryption helper --- securetar/__init__.py | 32 +++++++++++++++++-- tests/test_tar.py | 71 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 100 insertions(+), 3 deletions(-) diff --git a/securetar/__init__.py b/securetar/__init__.py index f25da9d..3ee247d 100644 --- a/securetar/__init__.py +++ b/securetar/__init__.py @@ -183,14 +183,14 @@ def _open_file(self) -> None: fd = os.open(self._name, file_mode, 0o666) self._file = os.fdopen(fd, "rb" if read_mode else "wb") - def _setup_cipher(self) -> None: + def _setup_cipher(self, plaintext_size: int = 0) -> None: # Extract IV for CBC if self._mode == MOD_READ: self.securetar_header = SecureTarHeader.from_bytes(self._file) cbc_rand = self.securetar_header.cbc_rand else: cbc_rand = os.urandom(IV_SIZE) - self.securetar_header = SecureTarHeader(cbc_rand, 0) + self.securetar_header = SecureTarHeader(cbc_rand, plaintext_size) self._file.write(self.securetar_header.to_bytes()) # Create Cipher @@ -297,6 +297,34 @@ def read(self, size: int = 0) -> bytes: finally: self._close_file() + @contextmanager + def encrypt(self, tarinfo: tarfile.TarInfo) -> Generator[BinaryIO, None, None]: + """Encrypt inner tar. + + This is a helper to encrypt data and add padding. + """ + + class EncryptInnerTar: + """Encrypt inner tar file.""" + + def __init__(self, parent: SecureTarFile) -> None: + """Initialize.""" + self._parent = parent + self._size = tarinfo.size + BLOCK_SIZE - tarinfo.size % BLOCK_SIZE + self._size += IV_SIZE + SECURETAR_HEADER_SIZE + + def write(self, data: bytes) -> None: + """Write data.""" + self._parent.write(data) + return None + + try: + self._open_file() + self._setup_cipher(tarinfo.size) + yield EncryptInnerTar(self) + finally: + self._close_file() + def write(self, data: bytes) -> None: """Write data.""" data = self._padder.update(data) diff --git a/tests/test_tar.py b/tests/test_tar.py index d5f0405..2ad1f3e 100644 --- a/tests/test_tar.py +++ b/tests/test_tar.py @@ -177,6 +177,7 @@ def test_create_encrypted_tar(tmp_path: Path, bufsize: int) -> None: assert temp_new.joinpath("README.md").is_file() +@pytest.mark.parametrize("bufsize", [33, 333, 10240, 4 * 2**20]) @pytest.mark.parametrize( ("enable_gzip", "inner_tar_files"), [ @@ -185,7 +186,7 @@ def test_create_encrypted_tar(tmp_path: Path, bufsize: int) -> None: ], ) def test_tar_inside_tar( - tmp_path: Path, enable_gzip: bool, inner_tar_files: tuple[str, ...] + tmp_path: Path, bufsize: int, enable_gzip: bool, inner_tar_files: tuple[str, ...] ) -> None: # Prepare test folder temp_orig = tmp_path.joinpath("orig") @@ -228,6 +229,74 @@ def test_tar_inside_tar( files.add(tar_info.name) assert files == {"backup.json", *inner_tar_files} + # Encrypt the inner tar files + key = os.urandom(16) + temp_encrypted = tmp_path.joinpath("encrypted") + os.makedirs(temp_encrypted, exist_ok=True) + with SecureTarFile(main_tar, "r", gzip=False, bufsize=bufsize) as tar_file: + for inner_tar_file in inner_tar_files: + tar_info = tar_file.getmember(inner_tar_file) + inner_tar_path = temp_encrypted.joinpath(tar_info.name) + with open(inner_tar_path, "wb") as file: + istf = SecureTarFile( + None, + gzip=False, # We encrypt the compressed tar + key=key, + mode="w", + fileobj=file, + ) + decrypted = tar_file.extractfile(tar_info) + with istf.encrypt(tar_info) as encrypted: + while data := decrypted.read(bufsize): + encrypted.write(data) + + # Check the indicated size is correct + assert ( + inner_tar_path.stat().st_size + == tar_info.size + 16 - tar_info.size % 16 + 16 + 32 + ) + + # Check the encrypted files can be opened + temp_decrypted = tmp_path.joinpath("decrypted") + os.makedirs(temp_decrypted, exist_ok=True) + for inner_tar_file in inner_tar_files: + encrypted_inner_tar_path = temp_encrypted.joinpath(inner_tar_file) + with open(encrypted_inner_tar_path, "rb") as encrypted_inner_tar: + tar_info = tarfile.TarInfo(inner_tar_file) + tar_info.size = encrypted_inner_tar_path.stat().st_size + with open(encrypted_inner_tar_path, "rb") as encrypted_inner_tar: + istf = SecureTarFile( + None, + gzip=False, # We decrypt the compressed tar + key=key, + mode="r", + fileobj=encrypted_inner_tar, + ) + decrypted_inner_tar_path = temp_decrypted.joinpath(inner_tar_file) + with open(decrypted_inner_tar_path, "wb") as file: + with istf.decrypt(tar_info) as decrypted: + while data := decrypted.read(bufsize): + file.write(data) + + # Check decrypted file is valid gzip, this fails if the padding is not + # discarded correctly + if enable_gzip: + assert decrypted_inner_tar_path.stat().st_size > 0 + gzip.decompress(decrypted_inner_tar_path.read_bytes()) + + # Check the tar file can be opened and iterate over it + files = set() + with tarfile.open(decrypted_inner_tar_path, "r") as itf: + for tar_info in itf: + files.add(tar_info.name) + assert files == { + ".", + "README.md", + "test1", + "test1/script.sh", + "test_symlink", + } + # Restore temp_new = tmp_path.joinpath("new") with SecureTarFile(main_tar, "r", gzip=False) as tar_file: