diff --git a/securetar/__init__.py b/securetar/__init__.py index 9386b08..04d7d05 100644 --- a/securetar/__init__.py +++ b/securetar/__init__.py @@ -2,7 +2,7 @@ from __future__ import annotations -from collections.abc import Generator +from collections.abc import Callable, Generator import hashlib import logging import os @@ -381,41 +381,45 @@ def secure_path(tar: tarfile.TarFile) -> Generator[tarfile.TarInfo, None, None]: yield member -def _is_excluded_by_filter(path: PurePath, exclude_list: list[str]) -> bool: - """Filter to filter excludes.""" - - for exclude in exclude_list: - if not path.match(exclude): - continue - _LOGGER.debug("Ignoring %s because of %s", path, exclude) - return True - - return False - - def atomic_contents_add( tar_file: tarfile.TarFile, origin_path: Path, - excludes: list[str], + file_filter: Callable[[PurePath], bool], arcname: str = ".", ) -> None: - """Append directories and/or files to the TarFile if excludes wont filter.""" + """Append directories and/or files to the TarFile if file_filter returns False. + + :param file_filter: A filter function, should return True if the item should + be excluded from the archive. The function should take a single argument, a + pathlib.PurePath object representing the relative path of the item to be archived. + """ - if _is_excluded_by_filter(origin_path, excludes): + if file_filter(PurePath(arcname)): return None + return _atomic_contents_add(tar_file, origin_path, file_filter, arcname) + + +def _atomic_contents_add( + tar_file: tarfile.TarFile, + origin_path: Path, + file_filter: Callable[[PurePath], bool], + arcname: str, +) -> None: + """Append directories and/or files to the TarFile if file_filter returns False.""" # Add directory only (recursive=False) to ensure we also archive empty directories tar_file.add(origin_path.as_posix(), arcname=arcname, recursive=False) for directory_item in origin_path.iterdir(): - if _is_excluded_by_filter(directory_item, excludes): + item_arcpath = PurePath(arcname, directory_item.name) + if file_filter(PurePath(item_arcpath)): continue - arcpath = PurePath(arcname, directory_item.name).as_posix() + item_arcname = item_arcpath.as_posix() if directory_item.is_dir() and not directory_item.is_symlink(): - atomic_contents_add(tar_file, directory_item, excludes, arcpath) + _atomic_contents_add(tar_file, directory_item, file_filter, item_arcname) continue - tar_file.add(directory_item.as_posix(), arcname=arcpath, recursive=False) + tar_file.add(directory_item.as_posix(), arcname=item_arcname, recursive=False) return None diff --git a/tests/test_tar.py b/tests/test_tar.py index e8e5f6e..d748ba7 100644 --- a/tests/test_tar.py +++ b/tests/test_tar.py @@ -8,12 +8,11 @@ import time from dataclasses import dataclass from pathlib import Path, PurePath -from unittest.mock import patch +from unittest.mock import Mock, patch import pytest from securetar import ( SecureTarFile, - _is_excluded_by_filter, _add_stream, atomic_contents_add, secure_path, @@ -48,32 +47,33 @@ def test_not_secure_path() -> None: assert [] == list(secure_path(test_list)) -def test_is_excluded_by_filter_good() -> None: +def test_file_filter(tmp_path: Path) -> None: """Test exclude filter.""" - filter_list = ["not/match", "/dev/xy"] - test_list = [ - PurePath("test.txt"), - PurePath("data/xy.blob"), - PurePath("bla/blu/ble"), - PurePath("data/../xy.blob"), - ] - - for path_object in test_list: - assert _is_excluded_by_filter(path_object, filter_list) is False - - -def test_is_exclude_by_filter_bad() -> None: - """Test exclude filter.""" - filter_list = ["*.txt", "data/*", "bla/blu/ble"] - test_list = [ - PurePath("test.txt"), - PurePath("data/xy.blob"), - PurePath("bla/blu/ble"), - PurePath("data/test_files/kk.txt"), - ] + file_filter = Mock(return_value=False) + # Prepare test folder + temp_orig = tmp_path.joinpath("orig") + fixture_data = Path(__file__).parent.joinpath("fixtures/tar_data") + shutil.copytree(fixture_data, temp_orig, symlinks=True) - for path_object in test_list: - assert _is_excluded_by_filter(path_object, filter_list) is True + # Create Tarfile + temp_tar = tmp_path.joinpath("backup.tar") + with SecureTarFile(temp_tar, "w") as tar_file: + atomic_contents_add( + tar_file, + temp_orig, + file_filter=file_filter, + arcname=".", + ) + paths = [call[1][0] for call in file_filter.mock_calls] + expected_paths = { + PurePath("."), + PurePath("README.md"), + PurePath("test_symlink"), + PurePath("test1"), + PurePath("test1/script.sh"), + } + assert len(paths) == len(expected_paths) + assert set(paths) == expected_paths @pytest.mark.parametrize("bufsize", [10240, 4 * 2**20]) @@ -90,7 +90,7 @@ def test_create_pure_tar(tmp_path: Path, bufsize: int) -> None: atomic_contents_add( tar_file, temp_orig, - excludes=[], + file_filter=lambda _: False, arcname=".", ) @@ -134,7 +134,7 @@ def test_create_encrypted_tar(tmp_path: Path, bufsize: int) -> None: atomic_contents_add( tar_file, temp_orig, - excludes=[], + file_filter=lambda _: False, arcname=".", ) @@ -193,7 +193,7 @@ def test_gzipped_tar_inside_tar(tmp_path: Path) -> None: atomic_contents_add( inner_tar_file, temp_orig, - excludes=[], + file_filter=lambda _: False, arcname=".", ) @@ -271,7 +271,7 @@ def test_gzipped_tar_inside_tar_failure(tmp_path: Path) -> None: atomic_contents_add( inner_tar_file, temp_orig, - excludes=[], + file_filter=lambda _: False, arcname=".", ) raise ValueError("Test") @@ -336,7 +336,7 @@ def test_encrypted_gzipped_tar_inside_tar(tmp_path: Path, bufsize: int) -> None: atomic_contents_add( inner_tar_file, temp_orig, - excludes=[], + file_filter=lambda _: False, arcname=".", )