Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor transfer_complete_datasets for one transfer job #340

Merged
merged 7 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/340.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug with `dkist.net.transfer_complete_datasets` where a length one ``UnifiedResponse`` would cause an error.
1 change: 1 addition & 0 deletions changelog/340.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
`dkist.net.transfer_complete_datasets` will now only create one Globus task for all datasets it downloads.
14 changes: 14 additions & 0 deletions dkist/net/globus/tests/test_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,20 @@ def test_start_transfer_src_base(mocker, transfer_client, mock_endpoints):
assert f"{os.path.sep}b{os.path.sep}" + filepath.name == tfr["destination_path"]


def test_start_transfer_multiple_paths(mocker, transfer_client, mock_endpoints):
submit_mock = mocker.patch("globus_sdk.TransferClient.submit_transfer",
return_value={"task_id": "task_id"})
mocker.patch("globus_sdk.TransferClient.get_submission_id",
return_value={"value": "wibble"})
file_list = list(map(Path, ["/a/name.fits", "/a/name2.fits"]))
dst_list = list(map(Path, ["/aplace/newname.fits", "/anotherplace/newname2.fits"]))
start_transfer_from_file_list("a", "b", dst_list, file_list)
transfer_manifest = submit_mock.call_args_list[0][0][0]["DATA"]

for filepath, tfr in zip(dst_list, transfer_manifest):
assert str(filepath) == tfr["destination_path"]


def test_process_event_list(transfer_client, mock_task_event_list):
(events,
json_events,
Expand Down
55 changes: 32 additions & 23 deletions dkist/net/globus/transfer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""
Functions and helpers for orchestrating and monitoring transfers using Globus.
"""
import copy
import json
import time
import pathlib
Expand All @@ -19,43 +18,50 @@
__all__ = ["watch_transfer_progress", "start_transfer_from_file_list"]


def start_transfer_from_file_list(src_endpoint, dst_endpoint, dst_base_path, file_list,
src_base_path=None, recursive=False, label=None):
def start_transfer_from_file_list(
src_endpoint: str,
dst_endpoint: str,
dst_base_path: PathLike,
file_list: list[PathLike],
src_base_path: PathLike = None,
recursive: bool | list[bool] = False,
label: str = None
) -> str:
"""
Start a new transfer task for a list of files.

Parameters
----------
src_endpoint : `str`
src_endpoint
The endpoint to copy file from. Can be any identifier accepted by
`~dkist.net.globus.get_endpoint_id`.

dst_endpoint : `str`
dst_endpoint
The endpoint to copy file to. Can be any identifier accepted by
`~dkist.net.globus.get_endpoint_id`.

dst_base_path : `~pathlib.Path`
dst_base_path
The destination path, must be accessible from the endpoint, will be
created if it does not exist.

file_list : `list`
file_list
The list of file paths on the ``src_endpoint`` to transfer to the ``dst_endpoint``.

src_base_path : `~pathlib.Path`, optional
src_base_path
The path prefix on the items in ``file_list`` to be stripped before
copying to ``dst_base_path``. i.e. if the file path in ``path_list`` is
``/spam/eggs/filename.fits`` and ``src_base_path`` is ``/spam`` the
``eggs/`` folder will be copied to ``dst_base_path``. By default only
the filenames are kept, and none of the directories.

recursive : `bool` or `list` of `bool`, optional
recursive
Controls if the path in ``file_list`` is added to the Globus task with
the recursive flag or not.
This should be `True` if the element of ``file_list`` is a directory.
If you need to set this per-item in ``file_list`` it should be a `list`
of `bool` of equal length as ``file_list``.

label : `str`
label
Label for the Globus transfer. If None then a default will be used.

Returns
Expand Down Expand Up @@ -87,17 +93,20 @@ def start_transfer_from_file_list(src_endpoint, dst_endpoint, dst_base_path, fil
sync_level="checksum",
verify_checksum=True)

dst_base_path = pathlib.Path(dst_base_path)
src_file_list = copy.copy(file_list)
dst_file_list = []
for src_file in src_file_list:
# If a common prefix is not specified just copy the filename
if not src_base_path:
src_filepath = src_file.name
else:
# Otherwise use the filepath relative to the base path
src_filepath = src_file.relative_to(src_base_path)
dst_file_list.append(dst_base_path / src_filepath)
src_file_list = file_list
if not isinstance(dst_base_path, (list, tuple)):
dst_base_path = pathlib.Path(dst_base_path)
dst_file_list = []
for src_file in src_file_list:
# If a common prefix is not specified just copy the filename or last directory
if not src_base_path:
src_filepath = src_file.name
else:
# Otherwise use the filepath relative to the base path
src_filepath = src_file.relative_to(src_base_path)
dst_file_list.append(dst_base_path / src_filepath)
else:
dst_file_list = dst_base_path

for src_file, dst_file, rec in zip(src_file_list, dst_file_list, recursive):
transfer_manifest.add_item(str(src_file), str(dst_file), recursive=rec)
Expand Down Expand Up @@ -265,12 +274,12 @@ def watch_transfer_progress(task_id, tfr_client, poll_interval=5,

def _orchestrate_transfer_task(file_list: list[PathLike],
recursive: list[bool],
destination_path: PathLike = "/~/",
destination_path: PathLike | list[PathLike] = "/~/",
destination_endpoint: str = None,
*,
progress: bool | Literal["verbose"] = True,
wait: bool = True,
label=None):
label: str = None):
"""
Transfer the files given in file_list to the path on ``destination_endpoint``.

Expand Down
104 changes: 60 additions & 44 deletions dkist/net/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from sunpy.net.base_client import QueryResponseRow
from sunpy.net.fido_factory import UnifiedResponse

from dkist.net import conf
from dkist.net.attrs import Dataset
from dkist.net.client import DKISTClient, DKISTQueryResponseTable
from dkist.net.globus.transfer import _orchestrate_transfer_task
Expand All @@ -35,6 +36,25 @@ def _get_dataset_inventory(dataset_id: str | Iterable[str]) -> DKISTQueryRespons
return results


def _get_globus_path_for_dataset(dataset: QueryResponseRow):
"""
Given a dataset ID get the directory on the source endpoint.
"""
if not isinstance(dataset, QueryResponseRow):
raise TypeError("Input should be a single row of dataset inventory.")

# At this point we only have one dataset, and it should be a row not a table
dataset_id = dataset["Dataset ID"]
proposal_id = dataset["Primary Proposal ID"]
bucket = dataset["Storage Bucket"]

return Path(conf.dataset_path.format(
datasetId=dataset_id,
primaryProposalId=proposal_id,
bucket=bucket
))


def transfer_complete_datasets(datasets: str | Iterable[str] | QueryResponseRow | DKISTQueryResponseTable | UnifiedResponse,
path: PathLike = "/~/",
destination_endpoint: str = None,
Expand All @@ -52,14 +72,13 @@ def transfer_complete_datasets(datasets: str | Iterable[str] | QueryResponseRow
``Fido.search``.

path
The path to save the data in, must be accessible by the Globus
endpoint.
The default value is ``/~/``.
It is possible to put placeholder strings in the path with any key
from the dataset inventory dictionary which can be accessed as
``ds.meta['inventory']``. An example of this would be
``path="~/dkist/{datasetId}"`` to save the files in a folder named
with the dataset ID being downloaded.
The path to save the data in, must be accessible by the Globus endpoint.
The default value is ``/~/``. It is possible to put placeholder strings
in the path with any key from inventory which can be shown with
:meth:`dkist.utils.inventory.path_format_keys`. An example of this
would be ``path="~/dkist/{primary_proposal_id}"`` to save the files in a
folder named for the proposal id. **Note** that ``{dataset_id}`` is
always added to the path if it is not already the last element.

destination_endpoint
A unique specifier for a Globus endpoint. If `None` a local
Expand Down Expand Up @@ -87,18 +106,22 @@ def transfer_complete_datasets(datasets: str | Iterable[str] | QueryResponseRow
The path to the directories containing the dataset(s) on the destination endpoint.

"""
# Avoid circular import
from dkist.net import conf
path = Path(path)
if path.parts[-1] != "{dataset_id}":
path = path / "{dataset_id}"

if isinstance(datasets, (DKISTQueryResponseTable, QueryResponseRow)):
# These we don't have to pre-process
if isinstance(datasets, DKISTQueryResponseTable):
# This we don't have to pre-process
pass

elif isinstance(datasets, QueryResponseRow):
datasets = DKISTQueryResponseTable(datasets)

elif isinstance(datasets, UnifiedResponse):
# If we have a UnifiedResponse object, it could contain one or more dkist tables.
# Stack them and then treat them like we were passed a single table with many rows.
datasets = datasets["dkist"]
if len(datasets) > 1:
if isinstance(datasets, UnifiedResponse) and len(datasets) > 1:
datasets = table.vstack(datasets, metadata_conflicts="silent")

elif isinstance(datasets, str) or all(isinstance(d, str) for d in datasets):
Expand All @@ -109,45 +132,38 @@ def transfer_complete_datasets(datasets: str | Iterable[str] | QueryResponseRow
# Anything else, error
raise TypeError(f"{type(datasets)} is of an unknown type, it should be search results or one or more dataset IDs.")

if not isinstance(datasets, QueryResponseRow) and len(datasets) > 1:
paths = []
for record in datasets:
paths.append(transfer_complete_datasets(record,
path=path,
destination_endpoint=destination_endpoint,
progress=progress,
wait=wait,
label=label))
return paths

# ensure a length one table is a row
if len(datasets) == 1:
datasets = datasets[0]

# At this point we only have one dataset, and it should be a row not a table
dataset = datasets
dataset_id = dataset["Dataset ID"]
proposal_id = dataset["Primary Proposal ID"]
bucket = dataset["Storage Bucket"]
source_paths = []
for record in datasets:
source_paths.append(_get_globus_path_for_dataset(record))

path_inv = path_format_inventory(dict(dataset))
destination_path = Path(path.format(**path_inv))
destination_paths = []
for dataset in datasets:
dataset_id = dataset["Dataset ID"]
proposal_id = dataset["Primary Proposal ID"]
bucket = dataset["Storage Bucket"]

file_list = [Path(conf.dataset_path.format(
datasetId=dataset_id,
primaryProposalId=proposal_id,
bucket=bucket
))]
path_inv = path_format_inventory(dict(dataset))
destination_paths.append(Path(str(path).format(**path_inv)))

if not label:
now = datetime.datetime.now().strftime("%Y-%m-%dT%H-%M")
datasetids = ",".join(datasets["Dataset ID"])
if len(datasetids) > 80:
datasetids = f"{len(datasets['Dataset ID'])} datasets"
label = f"DKIST Python Tools - {now} - {datasetids}"

now = datetime.datetime.now().strftime("%Y-%m-%dT%H-%M")
label = f"DKIST Python Tools - {now} {dataset_id}" if label is None else label
# Globus limits labels to 128 characters, so truncate if needed
# In principle this can't happen because of the truncation above, but just in case
if len(label) > 128:
label = label[:125] + "..." # pragma: no cover

_orchestrate_transfer_task(file_list,
_orchestrate_transfer_task(source_paths,
recursive=True,
destination_path=destination_path,
destination_path=destination_paths,
destination_endpoint=destination_endpoint,
progress=progress,
wait=wait,
label=label)

return destination_path / dataset_id
return destination_paths
Loading