Skip to content

Commit

Permalink
Merge pull request #74 from emontnemery/improve_tests
Browse files Browse the repository at this point in the history
Improve tests
  • Loading branch information
agners authored Jan 14, 2025
2 parents 2b70109 + 26a896f commit 52ec65a
Showing 1 changed file with 63 additions and 40 deletions.
103 changes: 63 additions & 40 deletions tests/test_tar.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ def test_create_encrypted_tar(tmp_path: Path, bufsize: int) -> None:
# Iterate over the tar file
files = set()
with SecureTarFile(temp_tar, "r", key=key, bufsize=bufsize) as tar_file:
for tarInfo in tar_file:
files.add(tarInfo.name)
for tar_info in tar_file:
files.add(tar_info.name)
assert files == {
".",
"README.md",
Expand Down Expand Up @@ -175,20 +175,28 @@ def test_create_encrypted_tar(tmp_path: Path, bufsize: int) -> None:
assert temp_new.joinpath("README.md").is_file()


def test_gzipped_tar_inside_tar(tmp_path: Path) -> None:
@pytest.mark.parametrize(
("enable_gzip", "inner_tar_files"),
[
(True, ("core.tar.gz", "core2.tar.gz", "core3.tar.gz")),
(False, ("core.tar", "core2.tar", "core3.tar")),
],
)
def test_tar_inside_tar(
tmp_path: Path, enable_gzip: bool, inner_tar_files: tuple[str, ...]
) -> None:
# Prepare test folder
temp_orig = tmp_path.joinpath("orig")
fixture_data = Path(__file__).parent.joinpath("fixtures/tar_data")
shutil.copytree(fixture_data, temp_orig, symlinks=True)

# Create Tarfile
main_tar = tmp_path.joinpath("backup.tar")
inner_tgz_files = ("core.tar.gz", "core2.tar.gz", "core3.tar.gz")
outer_secure_tar_file = SecureTarFile(main_tar, "w", gzip=False)
with outer_secure_tar_file as outer_tar_file:
for inner_tgz_file in inner_tgz_files:
for inner_tar_file in inner_tar_files:
with outer_secure_tar_file.create_inner_tar(
inner_tgz_file, gzip=True
inner_tar_file, gzip=enable_gzip
) as inner_tar_file:
atomic_contents_add(
inner_tar_file,
Expand All @@ -208,29 +216,41 @@ def test_gzipped_tar_inside_tar(tmp_path: Path) -> None:
assert len(outer_tar_file.getmembers()) == 4

assert main_tar.exists()

# Iterate over the tar file
files = set()
with SecureTarFile(main_tar, "r", gzip=False) as tar_file:
for tar_info in tar_file:
assert "_securetar.plaintext_size" not in tar_info.pax_headers
files.add(tar_info.name)
assert files == {"backup.json", *inner_tar_files}

# Restore
temp_new = tmp_path.joinpath("new")
with SecureTarFile(main_tar, "r", gzip=False) as tar_file:
tar_file.extractall(path=temp_new)

assert temp_new.is_dir()
core_tar_gz = temp_new.joinpath("core.tar.gz")
assert core_tar_gz.is_file()
compressed = core_tar_gz.read_bytes()
uncompressed = gzip.decompress(core_tar_gz.read_bytes())
assert len(uncompressed) > len(compressed)

assert temp_new.joinpath("core2.tar.gz").is_file()
assert temp_new.joinpath("core3.tar.gz").is_file()
core_tar = temp_new.joinpath(inner_tar_files[0])
assert core_tar.is_file()
if enable_gzip:
compressed = core_tar.read_bytes()
uncompressed = gzip.decompress(core_tar.read_bytes())
assert len(uncompressed) > len(compressed)

assert temp_new.joinpath(inner_tar_files[1]).is_file()
assert temp_new.joinpath(inner_tar_files[2]).is_file()
backup_json = temp_new.joinpath("backup.json")
assert backup_json.is_file()
assert backup_json.read_bytes() == raw_bytes

# Extract inner tars
for inner_tgz in inner_tgz_files:
temp_inner_new = tmp_path.joinpath("{inner_tgz}_inner_new")
for inner_tar_file in inner_tar_files:
temp_inner_new = tmp_path.joinpath(f"{inner_tar_file}_inner_new")

with SecureTarFile(temp_new.joinpath(inner_tgz), "r", gzip=True) as tar_file:
with SecureTarFile(
temp_new.joinpath(inner_tar_file), "r", gzip=enable_gzip
) as tar_file:
tar_file.extractall(path=temp_inner_new, members=tar_file)

assert temp_inner_new.is_dir()
Expand Down Expand Up @@ -316,7 +336,16 @@ def test_gzipped_tar_inside_tar_failure(tmp_path: Path) -> None:


@pytest.mark.parametrize("bufsize", [33, 333, 10240, 4 * 2**20])
def test_encrypted_gzipped_tar_inside_tar(tmp_path: Path, bufsize: int) -> None:
@pytest.mark.parametrize(
("enable_gzip", "inner_tar_files"),
[
(True, ("core.tar.gz", "core2.tar.gz", "core3.tar.gz")),
(False, ("core.tar", "core2.tar", "core3.tar")),
],
)
def test_encrypted_tar_inside_tar(
tmp_path: Path, bufsize: int, enable_gzip: bool, inner_tar_files: tuple[str, ...]
) -> None:
key = os.urandom(16)

# Prepare test folder
Expand All @@ -326,12 +355,11 @@ def test_encrypted_gzipped_tar_inside_tar(tmp_path: Path, bufsize: int) -> None:

# Create Tarfile
main_tar = tmp_path.joinpath("backup.tar")
inner_tgz_files = ("core.tar.gz", "core2.tar.gz", "core3.tar.gz")
outer_secure_tar_file = SecureTarFile(main_tar, "w", gzip=False, bufsize=bufsize)
with outer_secure_tar_file as outer_tar_file:
for inner_tgz_file in inner_tgz_files:
for inner_tar_file in inner_tar_files:
with outer_secure_tar_file.create_inner_tar(
inner_tgz_file, key=key, gzip=True
inner_tar_file, key=key, gzip=enable_gzip
) as inner_tar_file:
atomic_contents_add(
inner_tar_file,
Expand All @@ -352,11 +380,7 @@ def test_encrypted_gzipped_tar_inside_tar(tmp_path: Path, bufsize: int) -> None:
"_securetar.plaintext_size"
]
assert tar_info.pax_headers["_securetar.version"] == "2.0"
assert set(file_sizes) == {
"core.tar.gz",
"core2.tar.gz",
"core3.tar.gz",
}
assert set(file_sizes) == {*inner_tar_files}

# Decrypt the inner tar
temp_decrypted = tmp_path.joinpath("decrypted")
Expand All @@ -381,12 +405,13 @@ def test_encrypted_gzipped_tar_inside_tar(tmp_path: Path, bufsize: int) -> None:

# Check decrypted file is valid gzip, this fails if the padding is not
# discarded correctly
assert inner_tar_path.stat().st_size > 0
gzip.decompress(inner_tar_path.read_bytes())
if enable_gzip:
assert inner_tar_path.stat().st_size > 0
gzip.decompress(inner_tar_path.read_bytes())

# Check the tar file can be opened and iterate over it
files = set()
with tarfile.open(inner_tar_path, "r:gz") as inner_tar_file:
with tarfile.open(inner_tar_path, "r") as inner_tar_file:
for tar_info in inner_tar_file:
files.add(tar_info.name)
assert files == {
Expand All @@ -403,16 +428,19 @@ def test_encrypted_gzipped_tar_inside_tar(tmp_path: Path, bufsize: int) -> None:
tar_file.extractall(path=temp_new)

assert temp_new.is_dir()
assert temp_new.joinpath("core.tar.gz").is_file()
assert temp_new.joinpath("core2.tar.gz").is_file()
assert temp_new.joinpath("core3.tar.gz").is_file()
for inner_tar_file in inner_tar_files:
assert temp_new.joinpath(inner_tar_file).is_file()

# Extract inner encrypted tars
for inner_tgz in inner_tgz_files:
temp_inner_new = tmp_path.joinpath("{inner_tgz}_inner_new")
for inner_tar_file in inner_tar_files:
temp_inner_new = tmp_path.joinpath(f"{inner_tar_file}_inner_new")

with SecureTarFile(
temp_new.joinpath(inner_tgz), "r", key=key, gzip=True, bufsize=bufsize
temp_new.joinpath(inner_tar_file),
"r",
key=key,
gzip=enable_gzip,
bufsize=bufsize,
) as tar_file:
tar_file.extractall(path=temp_inner_new, members=tar_file)

Expand Down Expand Up @@ -541,11 +569,6 @@ def test_outer_tar_must_not_be_compressed(tmp_path: Path) -> None:
"format", [tarfile.PAX_FORMAT, tarfile.GNU_FORMAT, tarfile.USTAR_FORMAT]
)
def test_tar_stream(tmp_path: Path, format: int) -> None:
# Prepare test folder
temp_orig = tmp_path.joinpath("orig")
fixture_data = Path(__file__).parent.joinpath("fixtures/tar_data")
shutil.copytree(fixture_data, temp_orig, symlinks=True)

# Create Tarfile
main_tar = tmp_path.joinpath("backup.tar")

Expand Down

0 comments on commit 52ec65a

Please sign in to comment.