Skip to content

Commit

Permalink
Merge pull request #71 from emontnemery/filter_full_match
Browse files Browse the repository at this point in the history
Modify atomic_contents_add to accept a filter function
  • Loading branch information
agners authored Jan 14, 2025
2 parents c5c02e3 + 198af66 commit f113c50
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 51 deletions.
44 changes: 24 additions & 20 deletions securetar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from __future__ import annotations

from collections.abc import Generator
from collections.abc import Callable, Generator
import hashlib
import logging
import os
Expand Down Expand Up @@ -381,41 +381,45 @@ def secure_path(tar: tarfile.TarFile) -> Generator[tarfile.TarInfo, None, None]:
yield member


def _is_excluded_by_filter(path: PurePath, exclude_list: list[str]) -> bool:
"""Filter to filter excludes."""

for exclude in exclude_list:
if not path.match(exclude):
continue
_LOGGER.debug("Ignoring %s because of %s", path, exclude)
return True

return False


def atomic_contents_add(
tar_file: tarfile.TarFile,
origin_path: Path,
excludes: list[str],
file_filter: Callable[[PurePath], bool],
arcname: str = ".",
) -> None:
"""Append directories and/or files to the TarFile if excludes wont filter."""
"""Append directories and/or files to the TarFile if file_filter returns False.
:param file_filter: A filter function, should return True if the item should
be excluded from the archive. The function should take a single argument, a
pathlib.PurePath object representing the relative path of the item to be archived.
"""

if _is_excluded_by_filter(origin_path, excludes):
if file_filter(PurePath(arcname)):
return None
return _atomic_contents_add(tar_file, origin_path, file_filter, arcname)


def _atomic_contents_add(
tar_file: tarfile.TarFile,
origin_path: Path,
file_filter: Callable[[PurePath], bool],
arcname: str,
) -> None:
"""Append directories and/or files to the TarFile if file_filter returns False."""

# Add directory only (recursive=False) to ensure we also archive empty directories
tar_file.add(origin_path.as_posix(), arcname=arcname, recursive=False)

for directory_item in origin_path.iterdir():
if _is_excluded_by_filter(directory_item, excludes):
item_arcpath = PurePath(arcname, directory_item.name)
if file_filter(PurePath(item_arcpath)):
continue

arcpath = PurePath(arcname, directory_item.name).as_posix()
item_arcname = item_arcpath.as_posix()
if directory_item.is_dir() and not directory_item.is_symlink():
atomic_contents_add(tar_file, directory_item, excludes, arcpath)
_atomic_contents_add(tar_file, directory_item, file_filter, item_arcname)
continue

tar_file.add(directory_item.as_posix(), arcname=arcpath, recursive=False)
tar_file.add(directory_item.as_posix(), arcname=item_arcname, recursive=False)

return None
62 changes: 31 additions & 31 deletions tests/test_tar.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@
import time
from dataclasses import dataclass
from pathlib import Path, PurePath
from unittest.mock import patch
from unittest.mock import Mock, patch
import pytest

from securetar import (
SecureTarFile,
_is_excluded_by_filter,
_add_stream,
atomic_contents_add,
secure_path,
Expand Down Expand Up @@ -48,32 +47,33 @@ def test_not_secure_path() -> None:
assert [] == list(secure_path(test_list))


def test_is_excluded_by_filter_good() -> None:
def test_file_filter(tmp_path: Path) -> None:
"""Test exclude filter."""
filter_list = ["not/match", "/dev/xy"]
test_list = [
PurePath("test.txt"),
PurePath("data/xy.blob"),
PurePath("bla/blu/ble"),
PurePath("data/../xy.blob"),
]

for path_object in test_list:
assert _is_excluded_by_filter(path_object, filter_list) is False


def test_is_exclude_by_filter_bad() -> None:
"""Test exclude filter."""
filter_list = ["*.txt", "data/*", "bla/blu/ble"]
test_list = [
PurePath("test.txt"),
PurePath("data/xy.blob"),
PurePath("bla/blu/ble"),
PurePath("data/test_files/kk.txt"),
]
file_filter = Mock(return_value=False)
# Prepare test folder
temp_orig = tmp_path.joinpath("orig")
fixture_data = Path(__file__).parent.joinpath("fixtures/tar_data")
shutil.copytree(fixture_data, temp_orig, symlinks=True)

for path_object in test_list:
assert _is_excluded_by_filter(path_object, filter_list) is True
# Create Tarfile
temp_tar = tmp_path.joinpath("backup.tar")
with SecureTarFile(temp_tar, "w") as tar_file:
atomic_contents_add(
tar_file,
temp_orig,
file_filter=file_filter,
arcname=".",
)
paths = [call[1][0] for call in file_filter.mock_calls]
expected_paths = {
PurePath("."),
PurePath("README.md"),
PurePath("test_symlink"),
PurePath("test1"),
PurePath("test1/script.sh"),
}
assert len(paths) == len(expected_paths)
assert set(paths) == expected_paths


@pytest.mark.parametrize("bufsize", [10240, 4 * 2**20])
Expand All @@ -90,7 +90,7 @@ def test_create_pure_tar(tmp_path: Path, bufsize: int) -> None:
atomic_contents_add(
tar_file,
temp_orig,
excludes=[],
file_filter=lambda _: False,
arcname=".",
)

Expand Down Expand Up @@ -134,7 +134,7 @@ def test_create_encrypted_tar(tmp_path: Path, bufsize: int) -> None:
atomic_contents_add(
tar_file,
temp_orig,
excludes=[],
file_filter=lambda _: False,
arcname=".",
)

Expand Down Expand Up @@ -193,7 +193,7 @@ def test_gzipped_tar_inside_tar(tmp_path: Path) -> None:
atomic_contents_add(
inner_tar_file,
temp_orig,
excludes=[],
file_filter=lambda _: False,
arcname=".",
)

Expand Down Expand Up @@ -271,7 +271,7 @@ def test_gzipped_tar_inside_tar_failure(tmp_path: Path) -> None:
atomic_contents_add(
inner_tar_file,
temp_orig,
excludes=[],
file_filter=lambda _: False,
arcname=".",
)
raise ValueError("Test")
Expand Down Expand Up @@ -336,7 +336,7 @@ def test_encrypted_gzipped_tar_inside_tar(tmp_path: Path, bufsize: int) -> None:
atomic_contents_add(
inner_tar_file,
temp_orig,
excludes=[],
file_filter=lambda _: False,
arcname=".",
)

Expand Down

0 comments on commit f113c50

Please sign in to comment.