Skip to content

Commit

Permalink
Add header with securetar version
Browse files Browse the repository at this point in the history
  • Loading branch information
emontnemery committed Jan 13, 2025
1 parent 1afc048 commit 72b446c
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 0 deletions.
3 changes: 3 additions & 0 deletions securetar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
DEFAULT_BUFSIZE = 10240

PLAINTEXT_SIZE_HEADER = "_securetar.plaintext_size"
VERSION_HEADER = "_securetar.version"
SECURETAR_VERSION = "2.0"

MOD_READ = "r"
MOD_WRITE = "w"
Expand Down Expand Up @@ -338,6 +340,7 @@ def _add_stream(
# The plaintext size is the size of the written ciphertext
# minus the size of the padding and the IV
PLAINTEXT_SIZE_HEADER: str(size_of_inner_tar - len(padding) - IV_SIZE),
VERSION_HEADER: SECURETAR_VERSION,
}
# Now that we know the size of the inner tar, we seek back
# to where we started and re-add the member with the correct size
Expand Down
Binary file not shown.
85 changes: 85 additions & 0 deletions tests/test_tar.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ def test_encrypted_gzipped_tar_inside_tar(tmp_path: Path, bufsize: int) -> None:
file_sizes[tar_info.name] = tar_info.pax_headers[
"_securetar.plaintext_size"
]
assert tar_info.pax_headers["_securetar.version"] == "2.0"
assert set(file_sizes) == {
"core.tar.gz",
"core2.tar.gz",
Expand Down Expand Up @@ -428,6 +429,90 @@ def test_encrypted_gzipped_tar_inside_tar(tmp_path: Path, bufsize: int) -> None:
assert temp_inner_new.joinpath("README.md").is_file()


@pytest.mark.parametrize("bufsize", [33, 333, 10240, 4 * 2**20])
def test_encrypted_gzipped_tar_inside_tar_legacy_format(
tmp_path: Path, bufsize: int
) -> None:
key = b"0123456789abcdef"

fixture_path = Path(__file__).parent.joinpath("fixtures")
main_tar = fixture_path.joinpath("./backup_encrypted_gzipped_legacy_format.tar")

# Iterate over the tar file
files: set[str] = set()
with SecureTarFile(main_tar, "r", gzip=False, bufsize=bufsize) as tar_file:
for tar_info in tar_file:
assert "_securetar.plaintext_size" not in tar_info.pax_headers
assert "_securetar.version" not in tar_info.pax_headers
files.add(tar_info.name)
assert files == {
"core.tar.gz",
"core2.tar.gz",
"core3.tar.gz",
}

# Decrypt the inner tar
temp_decrypted = tmp_path.joinpath("decrypted")
os.makedirs(temp_decrypted, exist_ok=True)
with SecureTarFile(main_tar, "r", gzip=False, bufsize=bufsize) as tar_file:
for tar_info in tar_file:
istf = SecureTarFile(
None,
gzip=False, # We decrypt the compressed tar
key=key,
mode="r",
fileobj=tar_file.extractfile(tar_info),
)
inner_tar_path = temp_decrypted.joinpath(tar_info.name)
with open(inner_tar_path, "wb") as file:
with istf.decrypt(tar_info) as decrypted:
while data := decrypted.read(bufsize):
file.write(data)

shutil.copy(inner_tar_path, f"./{inner_tar_path.name}.orig")
# Rewrite the gzip footer
# Version 1 of SecureTarFile split the gzip footer in two 16-byte parts,
# combine them back into a single footer.
with open(inner_tar_path, "r+b") as file:
file.seek(-4, io.SEEK_END)
size_bytes = file.read(4)
file.seek(-20, io.SEEK_END)
crc = file.read(4)
file.seek(-36, io.SEEK_END)
last_block = file.read(16)
padding = last_block[-1]
# Note: This is not a full implementation of the padding removal. Version 1
# did not add any padding if the inner tar size was a multiple of 16. This
# means a full implementation needs to try to first treat the file as unpadded.
# If it fails and the tail is in the range 1..15, it may be padded. Remove
# the padding and try again. If this also fails, the file is corrupted.
# In this test case, we only handle the case where the padding is 1..15.
assert 1 <= padding <= 15
file.seek(-20 - last_block[-1], io.SEEK_END)
file.write(crc)
file.write(size_bytes)
file.truncate()
shutil.copy(inner_tar_path, f"./{inner_tar_path.name}.fixed")

# Check decrypted file is valid gzip, this fails if the padding is not
# discarded correctly
assert inner_tar_path.stat().st_size > 0
gzip.decompress(inner_tar_path.read_bytes())

# Check the tar file can be opened and iterate over it
files = set()
with tarfile.open(inner_tar_path, "r:gz") as inner_tar_file:
for tar_info in inner_tar_file:
files.add(tar_info.name)
assert files == {
".",
"README.md",
"test1",
"test1/script.sh",
"test_symlink",
}


def test_inner_tar_not_allowed_in_encrypted(tmp_path: Path) -> None:
# Create Tarfile
main_tar = tmp_path.joinpath("backup.tar")
Expand Down

0 comments on commit 72b446c

Please sign in to comment.