From 5361f1689c42ddb4377e8853a51f95f65b5f7f1e Mon Sep 17 00:00:00 2001 From: Erik Date: Thu, 9 Jan 2025 10:25:20 +0100 Subject: [PATCH] Add decryption helper --- securetar/__init__.py | 68 ++++++++++++++++++++++++++++++++++++++----- tests/test_tar.py | 45 +++++++++++++++++++++++++--- 2 files changed, 101 insertions(+), 12 deletions(-) diff --git a/securetar/__init__.py b/securetar/__init__.py index c059894..07b74ec 100644 --- a/securetar/__init__.py +++ b/securetar/__init__.py @@ -99,7 +99,18 @@ def __enter__(self) -> tarfile.TarFile: return self._tar # Encrypted/Decrypted Tarfile + self._open_file() + self._setup_cipher() + self._tar = tarfile.open( + fileobj=self, + mode=self._tar_mode, + dereference=False, + bufsize=self._bufsize, + ) + return self._tar + + def _open_file(self) -> None: if self._fileobj: # If we have a fileobj, we don't need to open a file self._file = self._fileobj @@ -113,6 +124,7 @@ def __enter__(self) -> tarfile.TarFile: fd = os.open(self._name, file_mode, 0o666) self._file = os.fdopen(fd, "rb" if read_mode else "wb") + def _setup_cipher(self) -> None: # Extract IV for CBC if self._mode == MOD_READ: cbc_rand = self._file.read(16) @@ -131,19 +143,15 @@ def __enter__(self) -> tarfile.TarFile: self._encrypt = self._aes.encryptor() self._padder = padding.PKCS7(BLOCK_SIZE_BITS).padder() - self._tar = tarfile.open( - fileobj=self, - mode=self._tar_mode, - dereference=False, - bufsize=self._bufsize, - ) - return self._tar - def __exit__(self, exc_type, exc_value, traceback) -> None: """Close file.""" if self._tar: self._tar.close() self._tar = None + self._close_file() + + def _close_file(self) -> None: + """Close file.""" if self._file: if not self._mode.startswith("r"): self._file.write(self._encrypt.update(self._padder.finalize())) @@ -151,6 +159,50 @@ def __exit__(self, exc_type, exc_value, traceback) -> None: self._file.close() self._file = None + @contextmanager + def decrypt(self, tarinfo: tarfile.TarInfo) -> Generator[BinaryIO, None, None]: + """Decrypt inner tar. + + This is a helper to decrypt data and discard the padding. + """ + + class DecryptInnerTar: + """Decrypt inner tar file.""" + + def __init__(self, parent: SecureTarFile) -> None: + """Initialize.""" + self._parent = parent + self._pos = 0 + self._size = tarinfo.size + self._tail = b"" + + def read(self, size: int = 0) -> bytes: + """Read data.""" + if self._tail: + # Finish reading tail + data = self._tail[:size] + self._tail = self._tail[size:] + return data + + data = self._parent.read(size) + self._pos += len(data) + if not data or self._size - self._pos > 16: + return data + + # Last block, read tail and discard padding + data += self._parent.read(self._size - self._pos) + padding_len = data[-1] + data = data[:-padding_len] + self._tail = data[size:] + return data[:size] + + try: + self._open_file() + self._setup_cipher() + yield DecryptInnerTar(self) + finally: + self._close_file() + def write(self, data: bytes) -> None: """Write data.""" data = self._padder.update(data) diff --git a/tests/test_tar.py b/tests/test_tar.py index b724f2b..542a2c7 100644 --- a/tests/test_tar.py +++ b/tests/test_tar.py @@ -315,7 +315,8 @@ def test_gzipped_tar_inside_tar_failure(tmp_path: Path) -> None: tar_file.extractall(path=temp_inner_new, members=tar_file) -def test_encrypted_gzipped_tar_inside_tar(tmp_path: Path) -> None: +@pytest.mark.parametrize("bufsize", [33, 333, 10240, 4 * 2**20]) +def test_encrypted_gzipped_tar_inside_tar(tmp_path: Path, bufsize: int) -> None: key = os.urandom(16) # Prepare test folder @@ -326,7 +327,7 @@ def test_encrypted_gzipped_tar_inside_tar(tmp_path: Path) -> None: # Create Tarfile main_tar = tmp_path.joinpath("backup.tar") inner_tgz_files = ("core.tar.gz", "core2.tar.gz", "core3.tar.gz") - outer_secure_tar_file = SecureTarFile(main_tar, "w", gzip=False) + outer_secure_tar_file = SecureTarFile(main_tar, "w", gzip=False, bufsize=bufsize) with outer_secure_tar_file as outer_tar_file: for inner_tgz_file in inner_tgz_files: with outer_secure_tar_file.create_inner_tar( @@ -342,9 +343,45 @@ def test_encrypted_gzipped_tar_inside_tar(tmp_path: Path) -> None: assert len(outer_tar_file.getmembers()) == 3 assert main_tar.exists() + + # Decrypt the inner tar + temp_decrypted = tmp_path.joinpath("decrypted") + os.makedirs(temp_decrypted, exist_ok=True) + with SecureTarFile(main_tar, "r", gzip=False, bufsize=bufsize) as tar_file: + for tar_info in tar_file: + istf = SecureTarFile( + None, + gzip=False, # We decrypt the compressed tar + key=key, + mode="r", + fileobj=tar_file.extractfile(tar_info), + ) + inner_tar_path = temp_decrypted.joinpath(tar_info.name) + with open(inner_tar_path, "wb") as file: + with istf.decrypt(tar_info) as decrypted: + while data := decrypted.read(bufsize): + file.write(data) + + # Check decrypted file is valid gzip, this fails if the padding is not + # discarded correctly + assert inner_tar_path.stat().st_size > 0 + gzip.decompress(inner_tar_path.read_bytes()) + + # Check the tar file can be opened and iterate over it + files = set() + with tarfile.open(inner_tar_path, "r:gz") as inner_tar_file: + for tarInfo in tar_file: + for tarInfo in tar_file: + files.add(tarInfo.name) + assert files == { + "core.tar.gz", + "core2.tar.gz", + "core3.tar.gz", + } + # Restore temp_new = tmp_path.joinpath("new") - with SecureTarFile(main_tar, "r", gzip=False) as tar_file: + with SecureTarFile(main_tar, "r", gzip=False, bufsize=bufsize) as tar_file: tar_file.extractall(path=temp_new) assert temp_new.is_dir() @@ -357,7 +394,7 @@ def test_encrypted_gzipped_tar_inside_tar(tmp_path: Path) -> None: temp_inner_new = tmp_path.joinpath("{inner_tgz}_inner_new") with SecureTarFile( - temp_new.joinpath(inner_tgz), "r", key=key, gzip=True + temp_new.joinpath(inner_tgz), "r", key=key, gzip=True, bufsize=bufsize ) as tar_file: tar_file.extractall(path=temp_inner_new, members=tar_file)