Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DAR-3513][External] Addition of the item_merge_mode push argument #920

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions darwin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def _run(args: Namespace, parser: ArgumentParser) -> None:
args.extract_views,
args.preserve_folders,
args.verbose,
args.item_merge_mode,
)
# Remove a project (remotely)
elif args.action == "remove":
Expand Down
10 changes: 10 additions & 0 deletions darwin/cli_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,7 @@ def upload_data(
extract_views: bool = False,
preserve_folders: bool = False,
verbose: bool = False,
item_merge_mode: Optional[str] = None,
) -> None:
"""
Uploads the provided files to the remote dataset.
Expand Down Expand Up @@ -684,6 +685,14 @@ def upload_data(
Specify whether or not to preserve folder paths when uploading.
verbose : bool
Specify whether to have full traces print when uploading files or not.
item_merge_mode : Optional[str]
If set, each file path passed to `files_to_upload` behaves as follows:
- Paths pointing directly to individual files are ignored
- Paths pointing to folders of files will be uploaded according to the following mode rules.
Note that folders will not be recursively searched, so only files in the first level of the folder will be uploaded:
- "slots": Each file in the folder will be uploaded to a different slot of the same item.
- "series": All `.dcm` files in the folder will be concatenated into a single slot. All other files are ignored.
- "channels": Each file in the folder will be uploaded to a different channel of the same item.
"""
client: Client = _load_client()
try:
Expand Down Expand Up @@ -773,6 +782,7 @@ def file_upload_callback(
preserve_folders=preserve_folders,
progress_callback=progress_callback,
file_upload_callback=file_upload_callback,
item_merge_mode=item_merge_mode,
)
console = Console(theme=_console_theme())

Expand Down
1 change: 1 addition & 0 deletions darwin/dataset/remote_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def push(
preserve_folders: bool = False,
progress_callback: Optional[ProgressCallback] = None,
file_upload_callback: Optional[FileUploadCallback] = None,
item_merge_mode: Optional[str] = None,
) -> UploadHandler:
pass

Expand Down
206 changes: 172 additions & 34 deletions darwin/dataset/remote_dataset_v2.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
Expand All @@ -18,7 +19,9 @@
from darwin.dataset.release import Release
from darwin.dataset.upload_manager import (
FileUploadCallback,
ItemMergeMode,
LocalFile,
MultiFileItem,
ProgressCallback,
UploadHandler,
UploadHandlerV2,
Expand Down Expand Up @@ -166,14 +169,16 @@ def push(
preserve_folders: bool = False,
progress_callback: Optional[ProgressCallback] = None,
file_upload_callback: Optional[FileUploadCallback] = None,
item_merge_mode: Optional[str] = None,
) -> UploadHandler:
"""
Uploads a local dataset (images ONLY) in the datasets directory.

Parameters
----------
files_to_upload : Optional[List[Union[PathLike, LocalFile]]]
List of files to upload. Those can be folders.
List of files to upload. These can be folders.
If `item_merge_mode` is set, these must be folders.
blocking : bool, default: True
If False, the dataset is not uploaded and a generator function is returned instead.
multi_threaded : bool, default: True
Expand All @@ -188,7 +193,7 @@ def push(
extract_views: bool, default: False
When the uploading file is a volume, specify whether it's going to be split into orthogonal views.
files_to_exclude : Optional[PathLike]], default: None
Optional list of files to exclude from the file scan. Those can be folders.
Optional list of files to exclude from the file scan. These can be folders.
path: Optional[str], default: None
Optional path to store the files in.
preserve_folders : bool, default: False
Expand All @@ -197,11 +202,18 @@ def push(
Optional callback, called every time the progress of an uploading files is reported.
file_upload_callback: Optional[FileUploadCallback], default: None
Optional callback, called every time a file chunk is uploaded.

item_merge_mode : Optional[str]
If set, each file path passed to `files_to_upload` behaves as follows:
- Paths pointing directly to individual files are ignored
- Paths pointing to folders of files will be uploaded according to the below mode rules.
Note that folders will not be recursively searched, so only files in the first level of the folder will be uploaded:
- "slots": Each file in the folder will be uploaded to a different slot of the same item.
- "series": All `.dcm` files in the folder will be concatenated into a single slot. All other files are ignored.
- "channels": Each file in the folder will be uploaded to a different channel of the same item.
Returns
-------
handler : UploadHandler
Class for handling uploads, progress and error messages.
Class for handling uploads, progress and error messages.

Raises
------
Expand All @@ -216,44 +228,43 @@ def push(
if files_to_upload is None:
raise ValueError("No files or directory specified.")

if item_merge_mode:
try:
ItemMergeMode(item_merge_mode)
except ValueError:
raise ValueError(
f"Invalid item merge mode: {item_merge_mode}. Valid options are: 'slots', 'series', 'channels"
)

if item_merge_mode and preserve_folders:
raise TypeError(
"`item_merge_mode` does not support preserving local file structures with `preserve_folders` or `--folders`"
)

# Direct file paths
uploading_files = [
item for item in files_to_upload if isinstance(item, LocalFile)
]

# Folder paths
search_files = [
item for item in files_to_upload if not isinstance(item, LocalFile)
]

generic_parameters_specified = (
path is not None or fps != 0 or as_frames is not False
)
if uploading_files and generic_parameters_specified:
raise ValueError("Cannot specify a path when uploading a LocalFile object.")

for found_file in find_files(search_files, files_to_exclude=files_to_exclude):
local_path = path
if preserve_folders:
source_files = [
source_file
for source_file in search_files
if is_relative_to(found_file, source_file)
]
if source_files:
local_path = str(
found_file.relative_to(source_files[0]).parent.as_posix()
)
uploading_files.append(
LocalFile(
found_file,
fps=fps,
as_frames=as_frames,
extract_views=extract_views,
path=local_path,
)
if item_merge_mode:
uploading_files = _find_files_to_upload_merging(
search_files, files_to_exclude, item_merge_mode
)

if not uploading_files:
raise ValueError(
"No files to upload, check your path, exclusion filters and resume flag"
else:
uploading_files = _find_files_to_upload_no_merging(
search_files,
files_to_exclude,
path,
fps,
as_frames,
extract_views,
preserve_folders,
uploading_files,
)

handler = UploadHandlerV2(self, uploading_files)
Expand Down Expand Up @@ -842,3 +853,130 @@ def register_multi_slotted(
print(f" - {item}")
print(f"Reistration complete. Check your items in the dataset: {self.slug}")
return results


def _find_files_to_upload_merging(
search_files: List[PathLike],
files_to_exclude: List[PathLike],
item_merge_mode: str,
) -> List[MultiFileItem]:
"""
Finds files to upload as either:
- Multi-slotted items
- Multi-channel items
- Single-slotted items containing multiple `.dcm` files

Does not search each directory recursively, only considers files in the first level of each directory.

Parameters
----------
search_files : List[PathLike]
List of directories to search for files.
files_to_exclude : List[PathLike]
List of files to exclude from the file scan.
item_merge_mode : str
Mode to merge the files in the folders. Valid options are: 'slots', 'series', 'channels'.

Returns
-------
List[MultiFileItem]
List of files to upload.
"""
multi_file_items = []
for directory in search_files:
files_in_directory = list(
find_files(
[directory],
files_to_exclude=files_to_exclude,
recursive=False,
sort=True,
)
)
if not files_in_directory:
print(
f"Warning: There are no uploading files in the first level of {directory}, skipping"
)
continue
multi_file_items.append(
MultiFileItem(
Path(directory), files_in_directory, ItemMergeMode(item_merge_mode)
)
)
if not multi_file_items:
raise ValueError(
"No valid folders to upload after searching the passed directories for files"
)
return multi_file_items


def _find_files_to_upload_no_merging(
search_files: List[PathLike],
files_to_exclude: List[PathLike],
path: Optional[str],
fps: int,
as_frames: bool,
extract_views: bool,
preserve_folders: bool,
uploading_files: List[LocalFile],
) -> List[LocalFile]:
"""
Finds files to upload as single-slotted dataset items. Recursively searches the passed directories for files.

Parameters
----------
search_files : List[PathLike]
List of directories to search for files.
files_to_exclude : Optional[List[PathLike]]
List of files to exclude from the file scan.
path : Optional[str]
Path to store the files in.
fps: int
When uploading video files, specify the framerate.
as_frames: bool
When uploading video files, specify whether to upload as a list of frames.
extract_views: bool
When uploading volume files, specify whether to split into orthogonal views.
preserve_folders: bool
Specify whether or not to preserve folder paths when uploading.
uploading_files : List[LocalFile]
List of files to upload.

Returns
-------
List[LocalFile]
List of files to upload.
"""
generic_parameters_specified = (
path is not None or fps != 0 or as_frames is not False
)
if uploading_files and generic_parameters_specified:
raise ValueError("Cannot specify a path when uploading a LocalFile object.")

for found_file in find_files(search_files, files_to_exclude=files_to_exclude):
local_path = path
if preserve_folders:
source_files = [
source_file
for source_file in search_files
if is_relative_to(found_file, source_file)
]
if source_files:
local_path = str(
found_file.relative_to(source_files[0]).parent.as_posix()
)
uploading_files.append(
LocalFile(
found_file,
fps=fps,
as_frames=as_frames,
extract_views=extract_views,
path=local_path,
)
)

if not uploading_files:
raise ValueError(
"No files to upload, check your path, exclusion filters and resume flag"
)

return uploading_files
Loading