diff --git a/darwin/cli.py b/darwin/cli.py index bf493341a..c48f55e38 100644 --- a/darwin/cli.py +++ b/darwin/cli.py @@ -126,6 +126,7 @@ def _run(args: Namespace, parser: ArgumentParser) -> None: args.extract_views, args.preserve_folders, args.verbose, + args.item_merge_mode, ) # Remove a project (remotely) elif args.action == "remove": diff --git a/darwin/cli_functions.py b/darwin/cli_functions.py index 5d01d7a9d..9f67fc1b7 100644 --- a/darwin/cli_functions.py +++ b/darwin/cli_functions.py @@ -656,6 +656,7 @@ def upload_data( extract_views: bool = False, preserve_folders: bool = False, verbose: bool = False, + item_merge_mode: Optional[str] = None, ) -> None: """ Uploads the provided files to the remote dataset. @@ -684,6 +685,14 @@ def upload_data( Specify whether or not to preserve folder paths when uploading. verbose : bool Specify whether to have full traces print when uploading files or not. + item_merge_mode : Optional[str] + If set, each file path passed to `files_to_upload` behaves as follows: + - Paths pointing directly to individual files are ignored + - Paths pointing to folders of files will be uploaded according to the following mode rules. + Note that folders will not be recursively searched, so only files in the first level of the folder will be uploaded: + - "slots": Each file in the folder will be uploaded to a different slot of the same item. + - "series": All `.dcm` files in the folder will be concatenated into a single slot. All other files are ignored. + - "channels": Each file in the folder will be uploaded to a different channel of the same item. """ client: Client = _load_client() try: @@ -773,6 +782,7 @@ def file_upload_callback( preserve_folders=preserve_folders, progress_callback=progress_callback, file_upload_callback=file_upload_callback, + item_merge_mode=item_merge_mode, ) console = Console(theme=_console_theme()) diff --git a/darwin/dataset/remote_dataset.py b/darwin/dataset/remote_dataset.py index 154c26c85..34f642912 100644 --- a/darwin/dataset/remote_dataset.py +++ b/darwin/dataset/remote_dataset.py @@ -138,6 +138,7 @@ def push( preserve_folders: bool = False, progress_callback: Optional[ProgressCallback] = None, file_upload_callback: Optional[FileUploadCallback] = None, + item_merge_mode: Optional[str] = None, ) -> UploadHandler: pass diff --git a/darwin/dataset/remote_dataset_v2.py b/darwin/dataset/remote_dataset_v2.py index 59f87b320..fba60c8ff 100644 --- a/darwin/dataset/remote_dataset_v2.py +++ b/darwin/dataset/remote_dataset_v2.py @@ -1,4 +1,5 @@ import json +from pathlib import Path from typing import ( TYPE_CHECKING, Any, @@ -18,7 +19,9 @@ from darwin.dataset.release import Release from darwin.dataset.upload_manager import ( FileUploadCallback, + ItemMergeMode, LocalFile, + MultiFileItem, ProgressCallback, UploadHandler, UploadHandlerV2, @@ -166,6 +169,7 @@ def push( preserve_folders: bool = False, progress_callback: Optional[ProgressCallback] = None, file_upload_callback: Optional[FileUploadCallback] = None, + item_merge_mode: Optional[str] = None, ) -> UploadHandler: """ Uploads a local dataset (images ONLY) in the datasets directory. @@ -173,7 +177,8 @@ def push( Parameters ---------- files_to_upload : Optional[List[Union[PathLike, LocalFile]]] - List of files to upload. Those can be folders. + List of files to upload. These can be folders. + If `item_merge_mode` is set, these must be folders. blocking : bool, default: True If False, the dataset is not uploaded and a generator function is returned instead. multi_threaded : bool, default: True @@ -188,7 +193,7 @@ def push( extract_views: bool, default: False When the uploading file is a volume, specify whether it's going to be split into orthogonal views. files_to_exclude : Optional[PathLike]], default: None - Optional list of files to exclude from the file scan. Those can be folders. + Optional list of files to exclude from the file scan. These can be folders. path: Optional[str], default: None Optional path to store the files in. preserve_folders : bool, default: False @@ -197,11 +202,18 @@ def push( Optional callback, called every time the progress of an uploading files is reported. file_upload_callback: Optional[FileUploadCallback], default: None Optional callback, called every time a file chunk is uploaded. - + item_merge_mode : Optional[str] + If set, each file path passed to `files_to_upload` behaves as follows: + - Paths pointing directly to individual files are ignored + - Paths pointing to folders of files will be uploaded according to the below mode rules. + Note that folders will not be recursively searched, so only files in the first level of the folder will be uploaded: + - "slots": Each file in the folder will be uploaded to a different slot of the same item. + - "series": All `.dcm` files in the folder will be concatenated into a single slot. All other files are ignored. + - "channels": Each file in the folder will be uploaded to a different channel of the same item. Returns ------- handler : UploadHandler - Class for handling uploads, progress and error messages. + Class for handling uploads, progress and error messages. Raises ------ @@ -216,44 +228,43 @@ def push( if files_to_upload is None: raise ValueError("No files or directory specified.") + if item_merge_mode: + try: + ItemMergeMode(item_merge_mode) + except ValueError: + raise ValueError( + f"Invalid item merge mode: {item_merge_mode}. Valid options are: 'slots', 'series', 'channels" + ) + + if item_merge_mode and preserve_folders: + raise TypeError( + "`item_merge_mode` does not support preserving local file structures with `preserve_folders` or `--folders`" + ) + + # Direct file paths uploading_files = [ item for item in files_to_upload if isinstance(item, LocalFile) ] + + # Folder paths search_files = [ item for item in files_to_upload if not isinstance(item, LocalFile) ] - generic_parameters_specified = ( - path is not None or fps != 0 or as_frames is not False - ) - if uploading_files and generic_parameters_specified: - raise ValueError("Cannot specify a path when uploading a LocalFile object.") - - for found_file in find_files(search_files, files_to_exclude=files_to_exclude): - local_path = path - if preserve_folders: - source_files = [ - source_file - for source_file in search_files - if is_relative_to(found_file, source_file) - ] - if source_files: - local_path = str( - found_file.relative_to(source_files[0]).parent.as_posix() - ) - uploading_files.append( - LocalFile( - found_file, - fps=fps, - as_frames=as_frames, - extract_views=extract_views, - path=local_path, - ) + if item_merge_mode: + uploading_files = _find_files_to_upload_merging( + search_files, files_to_exclude, item_merge_mode ) - - if not uploading_files: - raise ValueError( - "No files to upload, check your path, exclusion filters and resume flag" + else: + uploading_files = _find_files_to_upload_no_merging( + search_files, + files_to_exclude, + path, + fps, + as_frames, + extract_views, + preserve_folders, + uploading_files, ) handler = UploadHandlerV2(self, uploading_files) @@ -842,3 +853,130 @@ def register_multi_slotted( print(f" - {item}") print(f"Reistration complete. Check your items in the dataset: {self.slug}") return results + + +def _find_files_to_upload_merging( + search_files: List[PathLike], + files_to_exclude: List[PathLike], + item_merge_mode: str, +) -> List[MultiFileItem]: + """ + Finds files to upload as either: + - Multi-slotted items + - Multi-channel items + - Single-slotted items containing multiple `.dcm` files + + Does not search each directory recursively, only considers files in the first level of each directory. + + Parameters + ---------- + search_files : List[PathLike] + List of directories to search for files. + files_to_exclude : List[PathLike] + List of files to exclude from the file scan. + item_merge_mode : str + Mode to merge the files in the folders. Valid options are: 'slots', 'series', 'channels'. + + Returns + ------- + List[MultiFileItem] + List of files to upload. + """ + multi_file_items = [] + for directory in search_files: + files_in_directory = list( + find_files( + [directory], + files_to_exclude=files_to_exclude, + recursive=False, + sort=True, + ) + ) + if not files_in_directory: + print( + f"Warning: There are no uploading files in the first level of {directory}, skipping" + ) + continue + multi_file_items.append( + MultiFileItem( + Path(directory), files_in_directory, ItemMergeMode(item_merge_mode) + ) + ) + if not multi_file_items: + raise ValueError( + "No valid folders to upload after searching the passed directories for files" + ) + return multi_file_items + + +def _find_files_to_upload_no_merging( + search_files: List[PathLike], + files_to_exclude: List[PathLike], + path: Optional[str], + fps: int, + as_frames: bool, + extract_views: bool, + preserve_folders: bool, + uploading_files: List[LocalFile], +) -> List[LocalFile]: + """ + Finds files to upload as single-slotted dataset items. Recursively searches the passed directories for files. + + Parameters + ---------- + search_files : List[PathLike] + List of directories to search for files. + files_to_exclude : Optional[List[PathLike]] + List of files to exclude from the file scan. + path : Optional[str] + Path to store the files in. + fps: int + When uploading video files, specify the framerate. + as_frames: bool + When uploading video files, specify whether to upload as a list of frames. + extract_views: bool + When uploading volume files, specify whether to split into orthogonal views. + preserve_folders: bool + Specify whether or not to preserve folder paths when uploading. + uploading_files : List[LocalFile] + List of files to upload. + + Returns + ------- + List[LocalFile] + List of files to upload. + """ + generic_parameters_specified = ( + path is not None or fps != 0 or as_frames is not False + ) + if uploading_files and generic_parameters_specified: + raise ValueError("Cannot specify a path when uploading a LocalFile object.") + + for found_file in find_files(search_files, files_to_exclude=files_to_exclude): + local_path = path + if preserve_folders: + source_files = [ + source_file + for source_file in search_files + if is_relative_to(found_file, source_file) + ] + if source_files: + local_path = str( + found_file.relative_to(source_files[0]).parent.as_posix() + ) + uploading_files.append( + LocalFile( + found_file, + fps=fps, + as_frames=as_frames, + extract_views=extract_views, + path=local_path, + ) + ) + + if not uploading_files: + raise ValueError( + "No files to upload, check your path, exclusion filters and resume flag" + ) + + return uploading_files diff --git a/darwin/dataset/upload_manager.py b/darwin/dataset/upload_manager.py index bcd28f8f7..7ba3fcd86 100644 --- a/darwin/dataset/upload_manager.py +++ b/darwin/dataset/upload_manager.py @@ -2,6 +2,7 @@ import os import time from dataclasses import dataclass +from enum import Enum from pathlib import Path from typing import ( TYPE_CHECKING, @@ -13,6 +14,7 @@ Optional, Set, Tuple, + Union, ) import requests @@ -31,6 +33,12 @@ from typing import Dict +class ItemMergeMode(Enum): + SLOTS = "slots" + SERIES = "series" + CHANNELS = "channels" + + class ItemPayload: """ Represents an item's payload. @@ -186,6 +194,50 @@ def full_path(self) -> str: return construct_full_path(self.data["path"], self.data["filename"]) +class MultiFileItem: + def __init__(self, directory: Path, files: List[Path], merge_mode: ItemMergeMode): + self.directory = directory + self.name = directory.name + self.files = files + self.merge_mode = merge_mode + self.layout = self._create_layout() + + def _create_layout(self): + """ + Creates the layout to be used when uploading the files as a dataset item: + - For multi-slotted items: LayoutV2 + - For series items: LayoutV2, but only with `.dcm` files + - For multi-channel items: LayoutV3 + + Raises + ------ + ValueError + - If no DICOM files are found in the directory for `ItemMergeMode.SERIES` items + - If the number of files is greater than 16 for `ItemMergeMode.CHANNELS` items + """ + if self.merge_mode == ItemMergeMode.SLOTS: + return { + "version": 2, + "slots": [str(i) for i in range(len(self.files))], + "type": "grid", + } + elif self.merge_mode == ItemMergeMode.SERIES: + self.files = [file for file in self.files if file.suffix.lower() == ".dcm"] + if not self.files: + raise ValueError("No `.dcm` files found in 1st level of directory") + return { + "version": 2, + "slots": [str(i) for i in range(len(self.files))], + "type": "grid", + } + elif self.merge_mode == ItemMergeMode.CHANNELS: + if len(self.files) > 16: + raise ValueError( + f"No multi-channel item can have more than 16 files. The following directory has {len(self.files)} files: {self.directory}" + ) + return {"version": 3, "slots_grid": [[[file.name for file in self.files]]]} + + class FileMonitor(object): """ Monitors the progress of a :class:``BufferedReader``. @@ -402,7 +454,11 @@ def _upload_file( class UploadHandlerV2(UploadHandler): - def __init__(self, dataset: "RemoteDataset", local_files: List[LocalFile]): + def __init__( + self, + dataset: "RemoteDataset", + local_files: Union[List[LocalFile], List[MultiFileItem]], + ): super().__init__(dataset=dataset, local_files=local_files) def _request_upload(self) -> Tuple[List[ItemPayload], List[ItemPayload]]: diff --git a/darwin/options.py b/darwin/options.py index b6a292394..6ac6c6717 100644 --- a/darwin/options.py +++ b/darwin/options.py @@ -183,6 +183,12 @@ def __init__(self) -> None: action="store_true", help="Preserve the local folder structure in the dataset.", ) + parser_push.add_argument( + "--item-merge-mode", + type=str, + choices=["slots", "series", "channels"], + help="Specify the item merge mode: `slots`, `series`, or `channels`", + ) # Remove parser_remove = dataset_action.add_parser( diff --git a/darwin/utils/utils.py b/darwin/utils/utils.py index 5f57449ad..f2cbd8d85 100644 --- a/darwin/utils/utils.py +++ b/darwin/utils/utils.py @@ -25,6 +25,7 @@ import requests from json_stream.base import PersistentStreamingJSONList, PersistentStreamingJSONObject from jsonschema import validators +from natsort import natsorted from requests import Response from rich.progress import ProgressType, track from upolygon import draw_polygon @@ -216,6 +217,7 @@ def find_files( *, files_to_exclude: List[dt.PathLike] = [], recursive: bool = True, + sort: bool = False, ) -> List[Path]: """ Retrieve a list of all files belonging to supported extensions. The exploration can be made @@ -229,7 +231,8 @@ def find_files( List of files to exclude from the search. recursive : bool Flag for recursive search. - + sort : bool + Flag for sorting the files naturally, i.e. file2.txt will come before file10.txt. Returns ------- List[Path] @@ -255,8 +258,12 @@ def find_files( raise UnsupportedFileType(path) files_to_exclude_full_paths = [str(Path(f)) for f in files_to_exclude] - - return [f for f in found_files if str(f) not in files_to_exclude_full_paths] + filtered_files = [ + f for f in found_files if str(f) not in files_to_exclude_full_paths + ] + if sort: + return natsorted(filtered_files) + return filtered_files def secure_continue_request() -> bool: diff --git a/poetry.lock b/poetry.lock index 9161c0ff2..a850e8c4c 100644 --- a/poetry.lock +++ b/poetry.lock @@ -913,6 +913,21 @@ files = [ {file = "mypy_extensions-1.0.0.tar.gz", hash = "sha256:75dbf8955dc00442a438fc4d0666508a9a97b6bd41aa2f0ffe9d2f2725af0782"}, ] +[[package]] +name = "natsort" +version = "8.4.0" +description = "Simple yet flexible natural sorting in Python." +optional = false +python-versions = ">=3.7" +files = [ + {file = "natsort-8.4.0-py3-none-any.whl", hash = "sha256:4732914fb471f56b5cce04d7bae6f164a592c7712e1c85f9ef585e197299521c"}, + {file = "natsort-8.4.0.tar.gz", hash = "sha256:45312c4a0e5507593da193dedd04abb1469253b601ecaf63445ad80f0a1ea581"}, +] + +[package.extras] +fast = ["fastnumbers (>=2.0.0)"] +icu = ["PyICU (>=1.0.0)"] + [[package]] name = "networkx" version = "3.1" @@ -2238,4 +2253,4 @@ test = ["pytest", "responses"] [metadata] lock-version = "2.0" python-versions = ">=3.8.0,<3.12" -content-hash = "6e6c0628c98652df5dd76a8d82a0f67af9ec2037388350412152d21d84fa9d57" +content-hash = "3ea848bf4d0e5e0f22170f20321ce5d426eb79c6bc0a536b36519fd6f7c6782e" diff --git a/pyproject.toml b/pyproject.toml index 10c69fc3e..ef4146782 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -83,6 +83,7 @@ types-requests = "^2.28.11.8" upolygon = "0.1.11" tenacity = "8.5.0" +natsort = "^8.4.0" [tool.poetry.extras] dev = ["black", "isort", "flake8", "mypy", "debugpy", "responses", "pytest", "flake8-pyproject", "pytest-rerunfailures", "ruff", "validate-pyproject"] medical = ["nibabel", "connected-components-3d", "scipy"] diff --git a/tests/darwin/data/push_test_dir.zip b/tests/darwin/data/push_test_dir.zip new file mode 100644 index 000000000..6d76d8174 Binary files /dev/null and b/tests/darwin/data/push_test_dir.zip differ diff --git a/tests/darwin/dataset/remote_dataset_test.py b/tests/darwin/dataset/remote_dataset_test.py index c974ace0e..72dea3d65 100644 --- a/tests/darwin/dataset/remote_dataset_test.py +++ b/tests/darwin/dataset/remote_dataset_test.py @@ -10,6 +10,7 @@ import orjson as json import pytest import responses +from natsort import natsorted from pydantic import ValidationError from darwin.client import Client @@ -20,8 +21,17 @@ download_all_images_from_annotations, ) from darwin.dataset.release import Release, ReleaseStatus -from darwin.dataset.remote_dataset_v2 import RemoteDatasetV2 -from darwin.dataset.upload_manager import LocalFile, UploadHandlerV2 +from darwin.dataset.remote_dataset_v2 import ( + RemoteDatasetV2, + _find_files_to_upload_merging, + _find_files_to_upload_no_merging, +) +from darwin.dataset.upload_manager import ( + ItemMergeMode, + LocalFile, + MultiFileItem, + UploadHandlerV2, +) from darwin.datatypes import ManifestItem, ObjectStore, SegmentManifest from darwin.exceptions import UnsupportedExportFormat, UnsupportedFileType from darwin.item import DatasetItem @@ -601,6 +611,14 @@ def remote_dataset( @pytest.mark.usefixtures("file_read_write_test") class TestPush: + @pytest.fixture(scope="class") + def setup_zip(self): + zip_path = Path("tests/darwin/data/push_test_dir.zip") + with tempfile.TemporaryDirectory() as tmpdir: + with zipfile.ZipFile(zip_path, "r") as zip_ref: + zip_ref.extractall(tmpdir) + yield Path(tmpdir) + def test_raises_if_files_are_not_provided(self, remote_dataset: RemoteDataset): with pytest.raises(ValueError): remote_dataset.push(None) @@ -659,6 +677,143 @@ def test_raises_with_unsupported_files(self, remote_dataset: RemoteDataset): with pytest.raises(UnsupportedFileType): remote_dataset.push(["test.txt"]) + def test_raises_if_invalid_item_merge_mode(self, remote_dataset: RemoteDataset): + with pytest.raises(ValueError): + remote_dataset.push(["path/to/dir"], item_merge_mode="invalid") + + def test_raises_if_preserve_folders_with_item_merge_mode( + self, remote_dataset: RemoteDataset + ): + with pytest.raises(TypeError): + remote_dataset.push( + ["path/to/dir"], + item_merge_mode="slots", + preserve_folders=True, + ) + + def test_find_files_to_upload_merging_slots(self, setup_zip): + base_path = setup_zip / "push_test_dir" / "dir1" + search_files = [base_path / "jpegs", base_path / "dicoms"] + multi_file_items = _find_files_to_upload_merging(search_files, [], "slots") + assert len(multi_file_items) == 2 + assert all(isinstance(item, MultiFileItem) for item in multi_file_items) + + def test_find_files_to_upload_merging_series(self, setup_zip): + base_path = setup_zip / "push_test_dir" / "dir1" + search_files = [base_path / "dicoms"] + multi_file_items = _find_files_to_upload_merging(search_files, [], "series") + assert len(multi_file_items) == 1 + assert all(isinstance(item, MultiFileItem) for item in multi_file_items) + + def test_find_files_to_upload_merging_channels(self, setup_zip): + base_path = setup_zip / "push_test_dir" / "dir1" + search_files = [base_path / "jpegs", base_path / "dicoms"] + multi_file_items = _find_files_to_upload_merging(search_files, [], "channels") + assert len(multi_file_items) == 2 + + def test_find_files_to_upload_merging_does_not_search_recursively(self, setup_zip): + base_path = setup_zip / "push_test_dir" / "dir2" + search_files = [base_path / "recursive_search"] + multi_file_items = _find_files_to_upload_merging(search_files, [], "slots") + assert len(multi_file_items) == 1 + assert len(multi_file_items[0].files) == 2 + + def test_find_files_to_upload_no_merging_searches_recursively(self, setup_zip): + base_path = setup_zip / "push_test_dir" / "dir2" + search_files = [base_path / "recursive_search"] + local_files = _find_files_to_upload_no_merging( + search_files, + [], + None, + 0, + False, + False, + False, + [], + ) + assert len(local_files) == 11 + assert all(isinstance(file, LocalFile) for file in local_files) + + def test_find_files_to_upload_no_merging_no_files(self, setup_zip): + base_path = setup_zip / "push_test_dir" / "dir2" + search_files = [base_path / "no_files_1", base_path / "no_files_2"] + with pytest.raises( + ValueError, + match="No files to upload, check your path, exclusion filters and resume flag", + ): + _find_files_to_upload_no_merging( + search_files, + [], + None, + 0, + False, + False, + False, + [], + ) + + +class TestMultiFileItem: + @pytest.fixture(scope="class") + def setup_zip(self): + zip_path = Path("tests/darwin/data/push_test_dir.zip") + with tempfile.TemporaryDirectory() as tmpdir: + with zipfile.ZipFile(zip_path, "r") as zip_ref: + zip_ref.extractall(tmpdir) + yield Path(tmpdir) + + def test_create_multi_file_item_slots(self, setup_zip): + base_path = setup_zip / "push_test_dir" / "dir1" / "jpegs" + files = natsorted(list(base_path.glob("*"))) + item = MultiFileItem(base_path, files, merge_mode=ItemMergeMode.SLOTS) + assert len(item.files) == 6 + assert item.name == "jpegs" + assert item.layout == { + "version": 2, + "slots": ["0", "1", "2", "3", "4", "5"], + "type": "grid", + } + + def test_create_multi_file_item_series(self, setup_zip): + base_path = setup_zip / "push_test_dir" / "dir1" / "dicoms" + files = natsorted(list(base_path.glob("*"))) + item = MultiFileItem(base_path, files, merge_mode=ItemMergeMode.SERIES) + assert len(item.files) == 6 + assert item.name == "dicoms" + assert item.layout == { + "version": 2, + "slots": ["0", "1", "2", "3", "4", "5"], + "type": "grid", + } + + def test_create_multi_file_item_channels(self, setup_zip): + base_path = setup_zip / "push_test_dir" / "dir1" / "jpegs" + files = natsorted(list(base_path.glob("*"))) + item = MultiFileItem(base_path, files, merge_mode=ItemMergeMode.CHANNELS) + assert len(item.files) == 6 + assert item.name == "jpegs" + assert item.layout == { + "version": 3, + "slots_grid": [[["1.JPG", "3.JPG", "4.jpg", "5.JPG", "6.jpg", "7.JPG"]]], + } + + def test_create_series_no_valid_files(self, setup_zip): + base_path = setup_zip / "push_test_dir" / "dir1" / "jpegs" + files = natsorted(list(base_path.glob("*"))) + with pytest.raises( + ValueError, match="No `.dcm` files found in 1st level of directory" + ): + MultiFileItem(base_path, files, merge_mode=ItemMergeMode.SERIES) + + def test_create_channels_too_many_files(self, setup_zip): + base_path = setup_zip / "push_test_dir" / "dir2" / "too_many_channels" + files = natsorted(list(base_path.glob("*"))) + with pytest.raises( + ValueError, + match=r"No multi-channel item can have more than 16 files. The following directory has 17 files: .*", + ): + MultiFileItem(base_path, files, merge_mode=ItemMergeMode.CHANNELS) + @pytest.mark.usefixtures("file_read_write_test") class TestPull: