Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Dataset.download method #57

Merged
merged 7 commits into from
May 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelog/57.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Initial implementation of `dkist.Dataset.download` method for transferring
files via globus
57 changes: 29 additions & 28 deletions dkist/asdf_maker/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,34 +452,35 @@ def generate_datset_inventory_from_headers(headers, asdf_name):

schema = [
('asdf_object_key', str),
('browse_movie_object_key', str),
('browse_movie_url', str),
('bucket', str),
('contributing_experiment_ids', list),
('contributing_proposal_ids', list),
('dataset_id', str),
('dataset_inventory_id', int),
('dataset_size', int),
('end_time', Time),
('exposure_time', float),
('filter_wavelengths', list),
('frame_count', int),
('has_all_stokes', bool),
('instrument_name', str),
('observables', list),
('original_frame_count', int),
('primary_experiment_id', str),
('primary_proposal_id', str),
('quality_average_fried_parameter', float),
('quality_average_polarimetric_accuracy', float),
('recipe_id', int),
('recipe_instance_id', int),
('recipe_run_id', int),
('start_time', Time),
# ('stokes_parameters', str),
('target_type', str),
('wavelength_max', float),
('wavelength_min', float)]
('browse_movie_object_key', str),
('browse_movie_url', str),
('bucket', str),
('contributing_experiment_ids', list),
('contributing_proposal_ids', list),
('dataset_id', str),
('dataset_inventory_id', int),
('dataset_size', int),
('end_time', Time),
('exposure_time', float),
('filter_wavelengths', list),
('frame_count', int),
('has_all_stokes', bool),
('instrument_name', str),
('observables', list),
('original_frame_count', int),
('primary_experiment_id', str),
('primary_proposal_id', str),
('quality_average_fried_parameter', float),
('quality_average_polarimetric_accuracy', float),
('recipe_id', int),
('recipe_instance_id', int),
('recipe_run_id', int),
('start_time', Time),
# ('stokes_parameters', str),
('target_type', str),
('wavelength_max', float),
('wavelength_min', float)
]

header_mapping = {
'start_time': 'DATE-BGN',
Expand Down
57 changes: 57 additions & 0 deletions dkist/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@

from dkist.dataset.mixins import DatasetPlotMixin, DatasetSlicingMixin
from dkist.io import AstropyFITSLoader, DaskFITSArrayContainer
from dkist.utils.globus import (DKIST_DATA_CENTRE_DATASET_PATH, DKIST_DATA_CENTRE_ENDPOINT_ID,
start_transfer_from_file_list, watch_transfer_progress)
from dkist.utils.globus.endpoints import get_local_endpoint_id, get_transfer_client

try:
from importlib import resources # >= py 3.7
Expand Down Expand Up @@ -299,3 +302,57 @@ def crop_by_coords(self, lower_corner, upper_corner, units=None):
int(np.ceil(axis_pixels.value.max()))+1)
for axis_pixels in all_pix_corners])
return self[item]

@property
def filenames(self):
"""
The filenames referenced by this dataset.

.. note::
This is not their full file paths.
"""
if self._array_container is None:
return []
else:
return self._array_container.filenames

def download(self, path="/~/", destination_endpoint=None, progress=True):
"""
Start a Globus file transfer for all files in this Dataset.

Parameters
----------
path : `pathlib.Path` or `str`, optional
The path to save the data in, must be accessible by the Globus
endpoint.

destination_endpoint : `str`, optional
A unique specifier for a Globus endpoint. If `None` a local
endpoint will be used if it can be found, otherwise an error will
be raised. See `~dkist.utils.globus.get_endpoint_id` for valid
endpoint specifiers.

progress : `bool`, optional
If `True` status information and a progress bar will be displayed
while waiting for the transfer to complete.
"""

base_path = DKIST_DATA_CENTRE_DATASET_PATH.format(**self.meta)
# TODO: Default path to the config file
destination_path = Path(path) / self.meta['dataset_id']

file_list = self.filenames
file_list.append(Path(self.meta['asdf_object_key']))

if not destination_endpoint:
destination_endpoint = get_local_endpoint_id()

task_id = start_transfer_from_file_list(DKIST_DATA_CENTRE_ENDPOINT_ID,
destination_endpoint, base_path,
file_list, destination_path)

tc = get_transfer_client()
if progress:
watch_transfer_progress(task_id, tc)
else:
tc.task_wait(task_id, timeout=1e6)
12 changes: 11 additions & 1 deletion dkist/dataset/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import gwcs
import gwcs.coordinate_frames as cf
from sunpy.coordinates.frames import Helioprojective
from asdf import ExternalArrayReference

from dkist.dataset import Dataset
from dkist.io import AstropyFITSLoader, DaskFITSArrayContainer


@pytest.fixture
Expand Down Expand Up @@ -79,10 +81,18 @@ def identity_gwcs_4d():

@pytest.fixture
def dataset(array, identity_gwcs):
ds = Dataset(array, wcs=identity_gwcs)
meta = {'bucket': 'data',
'dataset_id': 'test_dataset',
'asdf_object_key': 'test_dataset.asdf'}
ds = Dataset(array, wcs=identity_gwcs, meta=meta)
# Sanity checks
assert ds.data is array
assert ds.wcs is identity_gwcs

ds._array_container = DaskFITSArrayContainer([ExternalArrayReference('test1.fits', 0, float, (10, 10)),
ExternalArrayReference('test2.fits', 0, float, (10, 10))],
loader=AstropyFITSLoader)

return ds


Expand Down
51 changes: 51 additions & 0 deletions dkist/dataset/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from dkist.data.test import rootdir
from dkist.dataset import Dataset
from dkist.utils.globus import DKIST_DATA_CENTRE_ENDPOINT_ID


@pytest.fixture
Expand Down Expand Up @@ -127,3 +128,53 @@ def test_array_container():
dataset.array_container = 10

assert len(dataset.array_container.filenames) == 11
assert len(dataset.filenames) == 11


def test_no_filenames(dataset_3d):
assert dataset_3d.filenames == []


def test_download(mocker, dataset):
mocker.patch("dkist.dataset.dataset.watch_transfer_progress",
autospec=True)
mocker.patch("dkist.dataset.dataset.get_local_endpoint_id",
autospec=True, return_value="mysecretendpoint")
mocker.patch("dkist.dataset.dataset.get_transfer_client",
autospec=True)
start_mock = mocker.patch("dkist.dataset.dataset.start_transfer_from_file_list",
autospec=True, return_value="1234")

file_list = dataset.filenames + [Path("test_dataset.asdf")]

dataset.download()

start_mock.assert_called_once_with(DKIST_DATA_CENTRE_ENDPOINT_ID,
"mysecretendpoint",
"/data/test_dataset",
file_list,
Path("/~/test_dataset"))


def test_download_no_progress(mocker, dataset):
progress_mock = mocker.patch("dkist.dataset.dataset.watch_transfer_progress",
autospec=True)
mocker.patch("dkist.dataset.dataset.get_local_endpoint_id",
autospec=True, return_value="mysecretendpoint")
tc_mock = mocker.patch("dkist.dataset.dataset.get_transfer_client",
autospec=True)
start_mock = mocker.patch("dkist.dataset.dataset.start_transfer_from_file_list",
autospec=True, return_value="1234")

file_list = dataset.filenames + [Path("test_dataset.asdf")]

dataset.download(progress=False)

start_mock.assert_called_once_with(DKIST_DATA_CENTRE_ENDPOINT_ID,
"mysecretendpoint",
"/data/test_dataset",
file_list,
Path("/~/test_dataset"))

progress_mock.assert_not_called()
tc_mock.return_value.task_wait.assert_called_once_with("1234", timeout=1e6)
9 changes: 9 additions & 0 deletions dkist/utils/globus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,12 @@
from .auth import *
from .endpoints import *
from .transfer import *

DKIST_DATA_CENTRE_ENDPOINT_ID = '5c510e9e-56fa-11e9-9e6e-0266b1fe9f9e'
"""The Globus endpoint ID of the main DKIST globus endpoint."""

DKIST_DATA_CENTRE_DATASET_PATH = "/{bucket}/{dataset_id}"
"""
The path template to a dataset on the main endpoint. Should only use keys from
the asdf metadata schema.
"""
8 changes: 4 additions & 4 deletions dkist/utils/globus/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ def get_endpoint_id(endpoint, tfr_client):
"""
Resolve an endpoint description to an ID.

This uses the `endpoint search
If the endpoint description is not a endpoint ID The `endpoint search
<https://docs.globus.org/api/transfer/endpoint_search/#endpoint_search>`__
functionality of the Globus API, so any endpoint search can be specified.
One and only one result must be returned from the search or a `ValueError`
will be raised.
functionality of the Globus API will be used, so any endpoint search can be
specified. One and only one result must be returned from the search or a
`ValueError` will be raised.

Parameters
----------
Expand Down
2 changes: 2 additions & 0 deletions dkist/utils/globus/tests/test_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ def mock_task_event_list(mocker):
def test_start_transfer(mocker, transfer_client, mock_endpoints):
submit_mock = mocker.patch("globus_sdk.TransferClient.submit_transfer",
return_value={"task_id": "task_id"})
mocker.patch("globus_sdk.TransferClient.get_submission_id",
return_value={'value': "wibble"})
file_list = list(map(pathlib.Path, ["/a/name.fits", "/a/name2.fits"]))
start_transfer_from_file_list("a", "b", "/", file_list)
calls = mock_endpoints.call_args_list
Expand Down