diff --git a/securetar/__init__.py b/securetar/__init__.py index 0c11a92..9386b08 100644 --- a/securetar/__init__.py +++ b/securetar/__init__.py @@ -29,6 +29,8 @@ DEFAULT_BUFSIZE = 10240 PLAINTEXT_SIZE_HEADER = "_securetar.plaintext_size" +VERSION_HEADER = "_securetar.version" +SECURETAR_VERSION = "2.0" MOD_READ = "r" MOD_WRITE = "w" @@ -338,6 +340,7 @@ def _add_stream( # The plaintext size is the size of the written ciphertext # minus the size of the padding and the IV PLAINTEXT_SIZE_HEADER: str(size_of_inner_tar - len(padding) - IV_SIZE), + VERSION_HEADER: SECURETAR_VERSION, } # Now that we know the size of the inner tar, we seek back # to where we started and re-add the member with the correct size diff --git a/tests/test_tar.py b/tests/test_tar.py index 540cd6c..a00acb2 100644 --- a/tests/test_tar.py +++ b/tests/test_tar.py @@ -351,6 +351,7 @@ def test_encrypted_gzipped_tar_inside_tar(tmp_path: Path, bufsize: int) -> None: file_sizes[tar_info.name] = tar_info.pax_headers[ "_securetar.plaintext_size" ] + assert tar_info.pax_headers["_securetar.version"] == "2.0" assert set(file_sizes) == { "core.tar.gz", "core2.tar.gz", @@ -428,6 +429,88 @@ def test_encrypted_gzipped_tar_inside_tar(tmp_path: Path, bufsize: int) -> None: assert temp_inner_new.joinpath("README.md").is_file() +@pytest.mark.parametrize("bufsize", [33, 333, 10240, 4 * 2**20]) +def test_encrypted_gzipped_tar_inside_tar_legacy_format(tmp_path: Path, bufsize: int) -> None: + key = b"0123456789abcdef" + + fixture_path = Path(__file__).parent.joinpath("fixtures") + main_tar = fixture_path.joinpath("./backup_encrypted_gzipped_legacy_format.tar") + + # Iterate over the tar file + files: set[str] = set() + with SecureTarFile(main_tar, "r", gzip=False, bufsize=bufsize) as tar_file: + for tar_info in tar_file: + assert "_securetar.plaintext_size" not in tar_info.pax_headers + assert "_securetar.version" not in tar_info.pax_headers + files.add(tar_info.name) + assert files == { + "core.tar.gz", + "core2.tar.gz", + "core3.tar.gz", + } + + # Decrypt the inner tar + temp_decrypted = tmp_path.joinpath("decrypted") + os.makedirs(temp_decrypted, exist_ok=True) + with SecureTarFile(main_tar, "r", gzip=False, bufsize=bufsize) as tar_file: + for tar_info in tar_file: + istf = SecureTarFile( + None, + gzip=False, # We decrypt the compressed tar + key=key, + mode="r", + fileobj=tar_file.extractfile(tar_info), + ) + inner_tar_path = temp_decrypted.joinpath(tar_info.name) + with open(inner_tar_path, "wb") as file: + with istf.decrypt(tar_info) as decrypted: + while data := decrypted.read(bufsize): + file.write(data) + + shutil.copy(inner_tar_path, f"./{inner_tar_path.name}.orig") + # Rewrite the gzip footer + # Version 1 of SecureTarFile split the gzip footer in two 16-byte parts, + # combine them back into a single footer. + with open(inner_tar_path, "r+b") as file: + file.seek(-4, io.SEEK_END) + size_bytes = file.read(4) + file.seek(-20, io.SEEK_END) + crc = file.read(4) + file.seek(-36, io.SEEK_END) + last_block = file.read(16) + padding = last_block[-1] + # Note: This is not a full implementation of the padding removal. Version 1 + # did not add any padding if the inner tar size was a multiple of 16. This + # means a full implementation needs to try to first treat the file as unpadded. + # If it fails and the tail is in the range 1..15, it may be padded. Remove + # the padding and try again. If this also fails, the file is corrupted. + # In this test case, we only handle the case where the padding is 1..15. + assert 1 <= padding <= 15 + file.seek(-20 - last_block[-1], io.SEEK_END) + file.write(crc) + file.write(size_bytes) + file.truncate() + shutil.copy(inner_tar_path, f"./{inner_tar_path.name}.fixed") + + # Check decrypted file is valid gzip, this fails if the padding is not + # discarded correctly + assert inner_tar_path.stat().st_size > 0 + gzip.decompress(inner_tar_path.read_bytes()) + + # Check the tar file can be opened and iterate over it + files = set() + with tarfile.open(inner_tar_path, "r:gz") as inner_tar_file: + for tar_info in inner_tar_file: + files.add(tar_info.name) + assert files == { + ".", + "README.md", + "test1", + "test1/script.sh", + "test_symlink", + } + + def test_inner_tar_not_allowed_in_encrypted(tmp_path: Path) -> None: # Create Tarfile main_tar = tmp_path.joinpath("backup.tar")