diff --git a/changelog/55.feature.rst b/changelog/55.feature.rst new file mode 100644 index 00000000..2eb6162b --- /dev/null +++ b/changelog/55.feature.rst @@ -0,0 +1 @@ +Added support for starting and monitoring Globus transfer tasks diff --git a/dkist/utils/__init__.py b/dkist/utils/__init__.py index d3f4517a..45a15e88 100644 --- a/dkist/utils/__init__.py +++ b/dkist/utils/__init__.py @@ -2,3 +2,20 @@ # This sub-module is destined for common non-package specific utility # functions. + + +def in_notebook(): # pragma: no cover + """ + Attempts to detect if this python process is connected to a Jupyter Notebook. + """ + try: + import ipykernel.zmqshell + shell = get_ipython() # noqa + if isinstance(shell, ipykernel.zmqshell.ZMQInteractiveShell): + # Check that we can import the right widget + from tqdm import _tqdm_notebook + _tqdm_notebook.IntProgress + return True + return False + except Exception: + return False diff --git a/dkist/utils/globus/__init__.py b/dkist/utils/globus/__init__.py index 99485fb1..f596873d 100644 --- a/dkist/utils/globus/__init__.py +++ b/dkist/utils/globus/__init__.py @@ -3,3 +3,4 @@ """ from .auth import * from .endpoints import * +from .transfer import * diff --git a/dkist/utils/globus/endpoints.py b/dkist/utils/globus/endpoints.py index b503752c..733cec87 100644 --- a/dkist/utils/globus/endpoints.py +++ b/dkist/utils/globus/endpoints.py @@ -103,18 +103,18 @@ def get_endpoint_id(endpoint, tfr_client): @ensure_globus_authorized -def auto_activate_endpoint(tfr_client, endpoint_id): # pragma: no cover +def auto_activate_endpoint(endpoint_id, tfr_client): # pragma: no cover """ Perform activation of a Globus endpoint. Parameters ---------- - tfr_client : `globus_sdk.TransferClient` - The transfer client to use for the activation. - endpoint_id : `str` The uuid of the endpoint to activate. + tfr_client : `globus_sdk.TransferClient` + The transfer client to use for the activation. + """ activation = tfr_client.endpoint_get_activation_requirements(endpoint_id) needs_activation = bool(activation['DATA']) @@ -160,7 +160,7 @@ def get_directory_listing(path, endpoint=None): if endpoint_id is None: endpoint_id = get_endpoint_id(endpoint, tc) - auto_activate_endpoint(tc, endpoint_id) + auto_activate_endpoint(endpoint_id, tc) response = tc.operation_ls(endpoint_id, path=path.as_posix()) names = [r['name'] for r in response] diff --git a/dkist/utils/globus/tests/conftest.py b/dkist/utils/globus/tests/conftest.py new file mode 100644 index 00000000..e4874c61 --- /dev/null +++ b/dkist/utils/globus/tests/conftest.py @@ -0,0 +1,20 @@ +from unittest import mock +import pytest + +from dkist.utils.globus.endpoints import get_transfer_client + + +@pytest.fixture() +def transfer_client(mocker): + mocker.patch("globus_sdk.TransferClient.get_submission_id", + return_value={'value': "1234"}) + + tc = get_transfer_client() + + mocker.patch("dkist.utils.globus.endpoints.get_transfer_client", + return_value=tc) + mocker.patch("dkist.utils.globus.transfer.get_transfer_client", + return_value=tc) + mocker.patch("dkist.utils.globus.auth.get_refresh_token_authorizer", + return_value=None) + return tc diff --git a/dkist/utils/globus/tests/test_endpoints.py b/dkist/utils/globus/tests/test_endpoints.py index 10c5f96a..ce4c479e 100644 --- a/dkist/utils/globus/tests/test_endpoints.py +++ b/dkist/utils/globus/tests/test_endpoints.py @@ -31,16 +31,6 @@ def ls_response(mocker): return resp -@pytest.fixture -def transfer_client(mocker): - tc = get_transfer_client() - mocker.patch("dkist.utils.globus.endpoints.get_transfer_client", - return_value=tc) - mocker.patch("dkist.utils.globus.auth.get_refresh_token_authorizer", - return_value=None) - return tc - - @pytest.fixture def mock_search(mocker): mocker.patch("globus_sdk.TransferClient.endpoint_search", diff --git a/dkist/utils/globus/tests/test_transfer.py b/dkist/utils/globus/tests/test_transfer.py new file mode 100644 index 00000000..2e47a7df --- /dev/null +++ b/dkist/utils/globus/tests/test_transfer.py @@ -0,0 +1,143 @@ +import pathlib +from unittest import mock + +import pytest +from globus_sdk import GlobusResponse + +from dkist.utils.globus.transfer import (_get_speed, _process_task_events, + start_transfer_from_file_list) + + +@pytest.fixture +def mock_endpoints(mocker): + def id_mock(endpoint, tc): + return endpoint + mocker.patch("dkist.utils.globus.transfer.auto_activate_endpoint") + return mocker.patch("dkist.utils.globus.transfer.get_endpoint_id", + side_effect=id_mock) + + +@pytest.fixture +def mock_task_event_list(mocker): + task_list = [ + GlobusResponse({ + 'DATA_TYPE': 'event', + 'code': 'STARTED', + 'description': 'started', + 'details': + '{\n "type": "GridFTP Transfer", \n "concurrency": 2, \n "protocol": "Mode S"\n}', + 'is_error': False, + 'parent_task_id': None, + 'time': '2019-05-16 10:13:26+00:00' + }), + GlobusResponse({ + 'DATA_TYPE': 'event', + 'code': 'SUCCEEDED', + 'description': 'succeeded', + 'details': 'Scanned 100 file(s)', + 'is_error': False, + 'parent_task_id': None, + 'time': '2019-05-16 10:13:24+00:00' + }), + GlobusResponse({ + 'DATA_TYPE': 'event', + 'code': 'STARTED', + 'description': 'started', + 'details': 'Starting sync scan', + 'is_error': False, + 'parent_task_id': None, + 'time': '2019-05-16 10:13:20+00:00' + }) + ] + return mocker.patch("globus_sdk.TransferClient.task_event_list", + return_value=task_list) + + +def test_start_transfer(mocker, transfer_client, mock_endpoints): + submit_mock = mocker.patch("globus_sdk.TransferClient.submit_transfer", + return_value={"task_id": "task_id"}) + file_list = list(map(pathlib.Path, ["/a/name.fits", "/a/name2.fits"])) + start_transfer_from_file_list("a", "b", "/", file_list) + calls = mock_endpoints.call_args_list + assert calls[0][0][0] == "a" + assert calls[1][0][0] == "b" + + submit_mock.assert_called_once() + transfer_manifest = submit_mock.call_args_list[0][0][0]['DATA'] + + for filepath, tfr in zip(file_list, transfer_manifest): + assert str(filepath) == tfr['source_path'] + assert "/" + filepath.name == tfr['destination_path'] + + +def test_start_transfer_src_base(mocker, transfer_client, mock_endpoints): + submit_mock = mocker.patch("globus_sdk.TransferClient.submit_transfer", + return_value={"task_id": "task_id"}) + file_list = list(map(pathlib.Path, ["/a/b/name.fits", "/a/b/name2.fits"])) + start_transfer_from_file_list("a", "b", "/", file_list, "/a") + calls = mock_endpoints.call_args_list + assert calls[0][0][0] == "a" + assert calls[1][0][0] == "b" + + submit_mock.assert_called_once() + transfer_manifest = submit_mock.call_args_list[0][0][0]['DATA'] + + for filepath, tfr in zip(file_list, transfer_manifest): + assert str(filepath) == tfr['source_path'] + assert "/b/" + filepath.name == tfr['destination_path'] + + +def test_process_event_list(transfer_client, mock_task_event_list): + (events, + json_events, + message_events) = _process_task_events("1234", set(), transfer_client) + + assert isinstance(events, set) + assert all([isinstance(e, tuple) for e in events]) + assert all([all([isinstance(item, tuple) for item in e]) for e in events]) + + print(events) + assert len(json_events) == 1 + assert isinstance(json_events, tuple) + assert isinstance(json_events[0], dict) + assert isinstance(json_events[0]['details'], dict) + assert json_events[0]['code'] == 'STARTED' + + assert len(message_events) == 2 + assert isinstance(message_events, tuple) + assert isinstance(message_events[0], dict) + assert isinstance(message_events[0]['details'], str) + + +def test_process_event_list_message_only(transfer_client, mock_task_event_list): + # Filter out the json event + prev_events = tuple(map(lambda x: tuple(x.data.items()), + mock_task_event_list.return_value)) + prev_events = set(prev_events[0:1]) + + (events, + json_events, + message_events) = _process_task_events("1234", prev_events, transfer_client) + + assert isinstance(events, set) + assert all([isinstance(e, tuple) for e in events]) + assert all([all([isinstance(item, tuple) for item in e]) for e in events]) + + assert len(json_events) == 0 + assert isinstance(json_events, tuple) + + assert len(message_events) == 2 + assert isinstance(message_events, tuple) + assert isinstance(message_events[0], dict) + assert isinstance(message_events[0]['details'], str) + + +def test_get_speed(): + speed = _get_speed({'code': "PROGRESS", 'details': {'mbps': 10}}) + assert speed == 10 + speed = _get_speed({}) + assert speed is None + speed = _get_speed({'code': "progress", "details": "hello"}) + assert speed is None + speed = _get_speed({'code': "progress", "details": {"hello": "world"}}) + assert speed is None diff --git a/dkist/utils/globus/transfer.py b/dkist/utils/globus/transfer.py new file mode 100644 index 00000000..f0205620 --- /dev/null +++ b/dkist/utils/globus/transfer.py @@ -0,0 +1,221 @@ +""" +Functions and helpers for orchestrating and monitoring transfers using Globus. +""" +import copy +import json +import time +import pathlib +import datetime + +import globus_sdk +from tqdm import tqdm, tqdm_notebook + +from .. import in_notebook +from .endpoints import auto_activate_endpoint, get_endpoint_id, get_transfer_client + + +__all__ = ['watch_transfer_progress', 'start_transfer_from_file_list'] + + +def start_transfer_from_file_list(src_endpoint, dst_endpoint, dst_base_path, file_list, + src_base_path=None): + """ + Start a new transfer task for a list of files. + + Parameters + ---------- + src_endpoint : `str` + The endpoint to copy file from. Can be any identifier accepted by + `~dkist.utils.globus.get_endpoint_id`. + + dst_endpoint : `str` + The endpoint to copy file to. Can be any identifier accepted by + `~dkist.utils.globus.get_endpoint_id`. + + dst_base_path : `~pathlib.Path` + The destination path, must be accessible from the endpoint, will be + created if it does not exist. + + file_list : `list` + The list of file paths on the ``src_endpoint`` to transfer to the ``dst_endpoint``. + + src_base_path : `~pathlib.Path`, optional + The path prefix on the items in ``file_list`` to be stripped before + copying to ``dst_base_path``. i.e. if the file path in ``path_list`` is + ``/spam/eggs/filename.fits`` and ``src_base_path`` is ``/spam`` the + ``eggs/`` folder will be copied to ``dst_base_path``. By default only + the filenames are kept, and none of the directories. + + Returns + ------- + `str` + Task ID. + """ + + # Get a transfer client instance + tc = get_transfer_client() + + # Resolve to IDs and activate endpoints + src_endpoint = get_endpoint_id(src_endpoint, tc) + auto_activate_endpoint(src_endpoint, tc) + + dst_endpoint = get_endpoint_id(dst_endpoint, tc) + auto_activate_endpoint(dst_endpoint, tc) + + now = datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S") + transfer_manifest = globus_sdk.TransferData(tc, src_endpoint, dst_endpoint, + label=f"DKIST Python Tools - {now}", + sync_level="checksum", + verify_checksum=True) + + dst_base_path = pathlib.Path(dst_base_path) + src_file_list = copy.copy(file_list) + dst_file_list = [] + for src_file in src_file_list: + # If a common prefix is not specified just copy the filename + if not src_base_path: + src_filepath = src_file.name + else: + # Otherwise use the filepath relative to the base path + src_filepath = src_file.relative_to(src_base_path) + dst_file_list.append(dst_base_path / src_filepath) + + for src_file, dst_file in zip(src_file_list, dst_file_list): + transfer_manifest.add_item(str(src_file), str(dst_file)) + + return tc.submit_transfer(transfer_manifest)["task_id"] + + +def _process_task_events(task_id, prev_events, tfr_client): + """ + Process a globus task event list. + + This splits the events up into message events, which are events where the + details field is a string, and json events, which is where the details + field is a json object. + + Parameters + ---------- + prev_events : `set` + A set of already processed events. + + tfr_client : `globus_sdk.TransferClient` + The transfer client to use to get the events. + + Returns + ------- + prev_events : `set` + The complete list of all event processed so far. + + json_events : `tuple` of `dict` + All the events with json bodies. + + message_events : `tuple` of `dict` + All the events with message bodies. + """ + + # Convert all the events into a (key, value) tuple pair + events = set(map(lambda x: tuple(x.data.items()), + tfr_client.task_event_list(task_id, None))) + # Drop all events we have seen before + new_events = events.difference(prev_events) + + # Filter out the events which are json (start with {) + json_events = set(filter(lambda x: dict(x).get("details", "").startswith("{"), new_events)) + # All the other events are message events + message_events = tuple(map(dict, (new_events.difference(json_events)))) + + def json_loader(x): + """Modify the event so the json is a dict.""" + x['details'] = json.loads(x['details']) + return x + + # If some of the events are json events, load the json. + if json_events: + json_events = tuple(map(dict, map(json_loader, map(dict, json_events)))) + else: + json_events = tuple() + + return events, json_events, message_events + + +def _get_speed(event): + """ + A helper function to extract the speed from an event. + """ + if event.get('code', "").lower() == "progress" and isinstance(event['details'], dict): + return event['details'].get("mbps") + + +def get_progress_bar(*args, **kwargs): # pragma: no cover + """ + Return the correct tqdm instance. + """ + notebook = in_notebook() + if not notebook: + kwargs['bar_format'] = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{rate_fmt}{postfix}]' + else: + # TODO: Both having this and not having it breaks things. + kwargs['total'] = kwargs.get("total", 1e9) + the_tqdm = tqdm if not in_notebook() else tqdm_notebook + return the_tqdm(*args, **kwargs) + + +def watch_transfer_progress(task_id, tfr_client, poll_interval=5, verbose=False): # pragma: no cover + """ + Wait for a Globus transfer task to finish and display a progress bar. + + Parameters + ---------- + task_id : `str` + The task to monitor. + + tfr_client : `globus_sdk.TransferClient` + The transfer client to use to monitor the task. + + poll_interval : `int`, optional + The number of seconds to wait between API calls. + + verbose : `bool` + If `True` print all events received from Globus, defaults to `False` + which just prints Error events. + """ + started = False + prev_events = set() + progress = None + progress = get_progress_bar(unit="file", + dynamic_ncols=True) + while True: + (prev_events, + json_events, + message_events) = _process_task_events(task_id, prev_events, tfr_client) + + if ('code', 'STARTED') not in prev_events and not started: + started = True + progress.write("PENDING: Starting Transfer") + + # Print status messages if verbose or if they are errors + for event in message_events: + if event['is_error'] or verbose: + progress.write(f"{event['code']}: {event['details']}") + + # Extract and calculate the transfer speed from the event list + speed = (list(map(_get_speed, json_events)) or [None])[0] + speed = f"{speed} Mb/s" if speed else "" + if speed: + progress.set_postfix_str(speed) + + # Get the status of the task to see how many files we have processed. + task = tfr_client.get_task(task_id) + status = task['status'] + progress.total = task['files'] + progress.update((task['files_skipped'] + task['files_transferred']) - progress.n) + + # If the status of the task is not active we are finished. + if status != "ACTIVE": + progress.write(f"Task completed with {status} status.") + progress.close() + break + + # Wait for next poll + time.sleep(poll_interval)