Skip to content

Commit

Permalink
Add encryption helper
Browse files Browse the repository at this point in the history
  • Loading branch information
emontnemery committed Jan 22, 2025
1 parent eb85e5e commit c5f3c0c
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 3 deletions.
32 changes: 30 additions & 2 deletions securetar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,14 @@ def _open_file(self) -> None:
fd = os.open(self._name, file_mode, 0o666)
self._file = os.fdopen(fd, "rb" if read_mode else "wb")

def _setup_cipher(self) -> None:
def _setup_cipher(self, plaintext_size: int = 0) -> None:
# Extract IV for CBC
if self._mode == MOD_READ:
self.securetar_header = SecureTarHeader.from_bytes(self._file)
cbc_rand = self.securetar_header.cbc_rand
else:
cbc_rand = os.urandom(IV_SIZE)
self.securetar_header = SecureTarHeader(cbc_rand, 0)
self.securetar_header = SecureTarHeader(cbc_rand, plaintext_size)
self._file.write(self.securetar_header.to_bytes())

# Create Cipher
Expand Down Expand Up @@ -297,6 +297,34 @@ def read(self, size: int = 0) -> bytes:
finally:
self._close_file()

@contextmanager
def encrypt(self, tarinfo: tarfile.TarInfo) -> Generator[BinaryIO, None, None]:
"""Encrypt inner tar.
This is a helper to encrypt data and add padding.
"""

class EncryptInnerTar:
"""Encrypt inner tar file."""

def __init__(self, parent: SecureTarFile) -> None:
"""Initialize."""
self._parent = parent
self._size = tarinfo.size + BLOCK_SIZE - tarinfo.size % BLOCK_SIZE
self._size += IV_SIZE + SECURETAR_HEADER_SIZE

def write(self, data: bytes) -> None:
"""Write data."""
self._parent.write(data)
return None

try:
self._open_file()
self._setup_cipher(tarinfo.size)
yield EncryptInnerTar(self)
finally:
self._close_file()

def write(self, data: bytes) -> None:
"""Write data."""
data = self._padder.update(data)
Expand Down
71 changes: 70 additions & 1 deletion tests/test_tar.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ def test_create_encrypted_tar(tmp_path: Path, bufsize: int) -> None:
assert temp_new.joinpath("README.md").is_file()


@pytest.mark.parametrize("bufsize", [33, 333, 10240, 4 * 2**20])
@pytest.mark.parametrize(
("enable_gzip", "inner_tar_files"),
[
Expand All @@ -185,7 +186,7 @@ def test_create_encrypted_tar(tmp_path: Path, bufsize: int) -> None:
],
)
def test_tar_inside_tar(
tmp_path: Path, enable_gzip: bool, inner_tar_files: tuple[str, ...]
tmp_path: Path, bufsize: int, enable_gzip: bool, inner_tar_files: tuple[str, ...]
) -> None:
# Prepare test folder
temp_orig = tmp_path.joinpath("orig")
Expand Down Expand Up @@ -228,6 +229,74 @@ def test_tar_inside_tar(
files.add(tar_info.name)
assert files == {"backup.json", *inner_tar_files}

# Encrypt the inner tar files
key = os.urandom(16)
temp_encrypted = tmp_path.joinpath("encrypted")
os.makedirs(temp_encrypted, exist_ok=True)
with SecureTarFile(main_tar, "r", gzip=False, bufsize=bufsize) as tar_file:
for inner_tar_file in inner_tar_files:
tar_info = tar_file.getmember(inner_tar_file)
inner_tar_path = temp_encrypted.joinpath(tar_info.name)
with open(inner_tar_path, "wb") as file:
istf = SecureTarFile(
None,
gzip=False, # We encrypt the compressed tar
key=key,
mode="w",
fileobj=file,
)
decrypted = tar_file.extractfile(tar_info)
with istf.encrypt(tar_info) as encrypted:
while data := decrypted.read(bufsize):
encrypted.write(data)

# Check the indicated size is correct
assert (
inner_tar_path.stat().st_size
== tar_info.size + 16 - tar_info.size % 16 + 16 + 32
)

# Check the encrypted files can be opened
temp_decrypted = tmp_path.joinpath("decrypted")
os.makedirs(temp_decrypted, exist_ok=True)
for inner_tar_file in inner_tar_files:
encrypted_inner_tar_path = temp_encrypted.joinpath(inner_tar_file)
with open(encrypted_inner_tar_path, "rb") as encrypted_inner_tar:
tar_info = tarfile.TarInfo(inner_tar_file)
tar_info.size = encrypted_inner_tar_path.stat().st_size
with open(encrypted_inner_tar_path, "rb") as encrypted_inner_tar:
istf = SecureTarFile(
None,
gzip=False, # We decrypt the compressed tar
key=key,
mode="r",
fileobj=encrypted_inner_tar,
)
decrypted_inner_tar_path = temp_decrypted.joinpath(inner_tar_file)
with open(decrypted_inner_tar_path, "wb") as file:
with istf.decrypt(tar_info) as decrypted:
while data := decrypted.read(bufsize):
file.write(data)

# Check decrypted file is valid gzip, this fails if the padding is not
# discarded correctly
if enable_gzip:
assert decrypted_inner_tar_path.stat().st_size > 0
gzip.decompress(decrypted_inner_tar_path.read_bytes())

# Check the tar file can be opened and iterate over it
files = set()
with tarfile.open(decrypted_inner_tar_path, "r") as itf:
for tar_info in itf:
files.add(tar_info.name)
assert files == {
".",
"README.md",
"test1",
"test1/script.sh",
"test_symlink",
}

# Restore
temp_new = tmp_path.joinpath("new")
with SecureTarFile(main_tar, "r", gzip=False) as tar_file:
Expand Down

0 comments on commit c5f3c0c

Please sign in to comment.