Skip to content

Commit

Permalink
Replace custom PAX headers with a file header (#78)
Browse files Browse the repository at this point in the history
* Replace custom PAX headers with a file header

* Adjust

* Add helper class for reading and writing header

* Remove unused import

* Improve docstring
  • Loading branch information
emontnemery authored Jan 16, 2025
1 parent c9f0970 commit d88f95c
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 30 deletions.
82 changes: 64 additions & 18 deletions securetar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@
IV_SIZE = BLOCK_SIZE
DEFAULT_BUFSIZE = 10240

PLAINTEXT_SIZE_HEADER = "_securetar.plaintext_size"
VERSION_HEADER = "_securetar.version"
SECURETAR_VERSION = "2.0"
SECURETAR_MAGIC = b"SecureTar\x02\x00\x00\x00\x00\x00\x00"
SECURETAR_HEADER_SIZE = len(SECURETAR_MAGIC) + 16

GZIP_MAGIC_BYTES = b"\x1f\x8b\x08"
TAR_MAGIC_BYTES = b"ustar"
Expand All @@ -40,6 +39,44 @@
MOD_WRITE = "w"


class SecureTarHeader:
"""SecureTar header.
Reads and produces the SecureTar header. Also accepts the magic-less
format used in earlier releases of SecureTar.
"""

def __init__(self, cbc_rand: bytes, plaintext_size: int | None) -> None:
"""Initialize SecureTar header."""
self.cbc_rand = cbc_rand
self.plaintext_size = plaintext_size

@classmethod
def from_bytes(cls, f: IO[bytes]) -> SecureTarHeader:
"""Return header bytes."""
header = f.read(len(SECURETAR_MAGIC))
plaintext_size: int | None = None
if header != SECURETAR_MAGIC:
cbc_rand = header
else:
plaintext_size = int.from_bytes(f.read(8), "big")
f.read(8) # Skip reserved bytes
cbc_rand = f.read(IV_SIZE)

return cls(cbc_rand, plaintext_size)

def to_bytes(self) -> bytes:
"""Return header bytes."""
if self.plaintext_size is None:
raise ValueError("Plaintext size is required")
return (
SECURETAR_MAGIC
+ self.plaintext_size.to_bytes(8, "big")
+ bytes(8)
+ self.cbc_rand
)


class SecureTarError(Exception):
"""SecureTar error."""

Expand Down Expand Up @@ -88,7 +125,9 @@ def __init__(
self._decrypt: CipherContext | None = None
self._encrypt: CipherContext | None = None
self._padder: padding.PaddingContext | None = None
self._padding = bytearray()
self.padding_length = 0

self.securetar_header: SecureTarHeader | None = None

def create_inner_tar(
self, name: str, key: bytes | None = None, gzip: bool = True
Expand Down Expand Up @@ -147,10 +186,12 @@ def _open_file(self) -> None:
def _setup_cipher(self) -> None:
# Extract IV for CBC
if self._mode == MOD_READ:
cbc_rand = self._file.read(IV_SIZE)
self.securetar_header = SecureTarHeader.from_bytes(self._file)
cbc_rand = self.securetar_header.cbc_rand
else:
cbc_rand = os.urandom(IV_SIZE)
self._file.write(cbc_rand)
self.securetar_header = SecureTarHeader(cbc_rand, 0)
self._file.write(self.securetar_header.to_bytes())

# Create Cipher
self._aes = Cipher(
Expand All @@ -174,8 +215,9 @@ def _close_file(self) -> None:
"""Close file."""
if self._file:
if not self._mode.startswith("r"):
self._padding += self._padder.finalize()
self._file.write(self._encrypt.update(self._padding))
padding = self._padder.finalize()
self._file.write(self._encrypt.update(padding))
self.padding_length = len(padding)
if not self._fileobj:
self._file.close()
self._file = None
Expand All @@ -197,6 +239,8 @@ def __init__(self, parent: SecureTarFile) -> None:
self._pos = 0
self._size = tarinfo.size - IV_SIZE
self._tail: bytes | None = None
if parent.securetar_header.plaintext_size is not None:
self._size -= SECURETAR_HEADER_SIZE

@staticmethod
def _validate_inner_tar(head: bytes) -> None:
Expand Down Expand Up @@ -314,7 +358,7 @@ def __enter__(self) -> tarfile.TarFile:
tar_info.mtime = time.time()
else:
tar_info.mtime = int(time.time())
self.stream = _add_stream(self.outer_tar, tar_info, self._padding)
self.stream = _add_stream(self.outer_tar, tar_info, self)
self.stream.__enter__()
return super().__enter__()

Expand All @@ -326,7 +370,7 @@ def __exit__(self, exc_type, exc_value, traceback) -> None:

@contextmanager
def _add_stream(
tar: tarfile.TarFile, tar_info: tarfile.TarInfo, padding: bytearray
tar: tarfile.TarFile, tar_info: tarfile.TarInfo, inner_tar: _InnerSecureTarFile
) -> Generator[BinaryIO, None, None]:
"""Add a stream to the tarfile.
Expand Down Expand Up @@ -375,14 +419,16 @@ def _add_stream(
tar.offset += size_of_inner_tar + padding_size

tar_info.size = size_of_inner_tar
if padding:
tar_info.pax_headers = {
**tar_info.pax_headers,
# The plaintext size is the size of the written ciphertext
# minus the size of the padding and the IV
PLAINTEXT_SIZE_HEADER: str(size_of_inner_tar - len(padding) - IV_SIZE),
VERSION_HEADER: SECURETAR_VERSION,
}
if inner_tar.padding_length:
# Update the size in the header
inner_tar.securetar_header.plaintext_size = (
size_of_inner_tar
- inner_tar.padding_length
- IV_SIZE
- SECURETAR_HEADER_SIZE
)
fileobj.seek(tell_before_adding_inner_file_header + tar_info_header_len)
tar.fileobj.write(inner_tar.securetar_header.to_bytes())
# Now that we know the size of the inner tar, we seek back
# to where we started and re-add the member with the correct size
fileobj.seek(tell_before_adding_inner_file_header)
Expand Down
26 changes: 14 additions & 12 deletions tests/test_tar.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import pytest

from securetar import (
SECURETAR_MAGIC,
SecureTarFile,
SecureTarReadError,
_add_stream,
Expand Down Expand Up @@ -218,11 +219,12 @@ def test_tar_inside_tar(

assert main_tar.exists()

# Iterate over the tar file
# Iterate over the tar file, and check there's no securetar header
files = set()
with SecureTarFile(main_tar, "r", gzip=False) as tar_file:
for tar_info in tar_file:
assert "_securetar.plaintext_size" not in tar_info.pax_headers
inner_tar = tar_file.extractfile(tar_info)
assert inner_tar.read(len(SECURETAR_MAGIC)) != SECURETAR_MAGIC
files.add(tar_info.name)
assert files == {"backup.json", *inner_tar_files}

Expand Down Expand Up @@ -377,10 +379,9 @@ def test_encrypted_tar_inside_tar(
file_sizes: dict[str, int] = {}
with SecureTarFile(main_tar, "r", gzip=False, bufsize=bufsize) as tar_file:
for tar_info in tar_file:
file_sizes[tar_info.name] = tar_info.pax_headers[
"_securetar.plaintext_size"
]
assert tar_info.pax_headers["_securetar.version"] == "2.0"
inner_tar = tar_file.extractfile(tar_info)
assert inner_tar.read(len(SECURETAR_MAGIC)) == SECURETAR_MAGIC
file_sizes[tar_info.name] = int.from_bytes(inner_tar.read(8), "big")
assert set(file_sizes) == {*inner_tar_files}

# Decrypt the inner tar with wrong key
Expand Down Expand Up @@ -423,7 +424,7 @@ def test_encrypted_tar_inside_tar(
file.write(data)

# Check the indicated size is correct
assert inner_tar_path.stat().st_size == int(file_sizes[tar_info.name])
assert inner_tar_path.stat().st_size == file_sizes[tar_info.name]

# Check decrypted file is valid gzip, this fails if the padding is not
# discarded correctly
Expand Down Expand Up @@ -488,12 +489,12 @@ def test_encrypted_gzipped_tar_inside_tar_legacy_format(
fixture_path = Path(__file__).parent.joinpath("fixtures")
main_tar = fixture_path.joinpath("./backup_encrypted_gzipped_legacy_format.tar")

# Iterate over the tar file
# Iterate over the tar file, and check there's no securetar header
files: set[str] = set()
with SecureTarFile(main_tar, "r", gzip=False, bufsize=bufsize) as tar_file:
for tar_info in tar_file:
assert "_securetar.plaintext_size" not in tar_info.pax_headers
assert "_securetar.version" not in tar_info.pax_headers
inner_tar = tar_file.extractfile(tar_info)
assert inner_tar.read(len(SECURETAR_MAGIC)) != SECURETAR_MAGIC
files.add(tar_info.name)
assert files == {
"core.tar.gz",
Expand Down Expand Up @@ -595,9 +596,10 @@ def test_tar_stream(tmp_path: Path, format: int) -> None:
main_tar = tmp_path.joinpath("backup.tar")

with patch.object(tarfile, "DEFAULT_FORMAT", format):
with SecureTarFile(main_tar, "w", gzip=False) as tar_file:
ostf = SecureTarFile(main_tar, "w", gzip=False)
with ostf as tar_file:
tar_info = tarfile.TarInfo(name="test.txt")
with _add_stream(tar_file, tar_info, bytearray()) as stream:
with _add_stream(tar_file, tar_info, ostf) as stream:
stream.write(b"test")

# Restore
Expand Down

0 comments on commit d88f95c

Please sign in to comment.