From 07512961cce6a42dbb6b3706fecf67d9098f55db Mon Sep 17 00:00:00 2001 From: Stuart Mumford Date: Mon, 13 May 2019 19:12:54 +0100 Subject: [PATCH 1/6] Add support for globus transfer and monitoring --- dkist/utils/globus/endpoints.py | 10 +- dkist/utils/globus/transfer.py | 175 ++++++++++++++++++++++++++++++++ 2 files changed, 180 insertions(+), 5 deletions(-) create mode 100644 dkist/utils/globus/transfer.py diff --git a/dkist/utils/globus/endpoints.py b/dkist/utils/globus/endpoints.py index b503752c..733cec87 100644 --- a/dkist/utils/globus/endpoints.py +++ b/dkist/utils/globus/endpoints.py @@ -103,18 +103,18 @@ def get_endpoint_id(endpoint, tfr_client): @ensure_globus_authorized -def auto_activate_endpoint(tfr_client, endpoint_id): # pragma: no cover +def auto_activate_endpoint(endpoint_id, tfr_client): # pragma: no cover """ Perform activation of a Globus endpoint. Parameters ---------- - tfr_client : `globus_sdk.TransferClient` - The transfer client to use for the activation. - endpoint_id : `str` The uuid of the endpoint to activate. + tfr_client : `globus_sdk.TransferClient` + The transfer client to use for the activation. + """ activation = tfr_client.endpoint_get_activation_requirements(endpoint_id) needs_activation = bool(activation['DATA']) @@ -160,7 +160,7 @@ def get_directory_listing(path, endpoint=None): if endpoint_id is None: endpoint_id = get_endpoint_id(endpoint, tc) - auto_activate_endpoint(tc, endpoint_id) + auto_activate_endpoint(endpoint_id, tc) response = tc.operation_ls(endpoint_id, path=path.as_posix()) names = [r['name'] for r in response] diff --git a/dkist/utils/globus/transfer.py b/dkist/utils/globus/transfer.py new file mode 100644 index 00000000..7161a84c --- /dev/null +++ b/dkist/utils/globus/transfer.py @@ -0,0 +1,175 @@ +""" +Functions and helpers for orchestrating and monitoring transfers using Globus. +""" +import copy +import json +import time +import pathlib +import datetime + +import globus_sdk +from tqdm import tqdm + +from .endpoints import auto_activate_endpoint, get_endpoint_id, get_transfer_client + + +def start_transfer_from_file_list(src_endpoint, dst_endpoint, dst_base_path, file_list, + src_base_path=None): + """ + Start a new transfer task for a list of files. + + Parameters + ---------- + src_endpoint : `str` + The endpoint to copy file from. Can be any identifier accepted by + `~dkist.utils.globus.get_endpoint_id`. + + dst_endpoint : `str` + The endpoint to copy file to. Can be any identifier accepted by + `~dkist.utils.globus.get_endpoint_id`. + + dst_base_path : `~pathlib.Path` + The destination path, must be accessible from the endpoint, will be + created if it does not exist. + + file_list : `list` + The list of file paths on the ``src_endpoint`` to transfer to the ``dst_endpoint``. + + src_base_path : `~pathlib.Path`, optional + The path prefix on the items in ``file_list`` to be stripped before + copying to ``dst_base_path``. i.e. if the file path in ``path_list`` is + ``/spam/eggs/filename.fits`` and ``src_base_path`` is ``/spam`` the + ``eggs/`` folder will be copied to ``dst_base_path``. By default only + the filenames are kept, and none of the directories. + + Returns + ------- + `str` + Task ID. + """ + + # Get a transfer client instance + tc = get_transfer_client() + + # Resolve to IDs and activate endpoints + src_endpoint = get_endpoint_id(src_endpoint, tc) + auto_activate_endpoint(src_endpoint, tc) + + dst_endpoint = get_endpoint_id(dst_endpoint, tc) + auto_activate_endpoint(dst_endpoint, tc) + + now = datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S") + transfer_manifest = globus_sdk.TransferData(tc, src_endpoint, dst_endpoint, + label=f"DKIST Python Tools - {now}", + sync_level="checksum", + verify_checksum=True) + + dst_base_path = pathlib.Path(dst_base_path) + src_file_list = copy.copy(file_list) + dst_file_list = [] + for src_file in src_file_list: + # If a common prefix is not specified just copy the filename + if not src_base_path: + src_filepath = src_file.name + else: + # Otherwise use the filepath relative to the base path + src_filepath = src_file.relative_to(src_base_path) + dst_file_list.append(dst_base_path / src_filepath) + + for src_file, dst_file in zip(src_file_list, dst_file_list): + transfer_manifest.add_item(str(src_file), str(dst_file)) + + return tc.submit_transfer(transfer_manifest)["task_id"] + +def _process_task_events(task_id, prev_events, tfr_client): + """ + Process a globus task event list. + + This splits the events up into message events, which are events where the + details field is a string, and json events, which is where the details + field is a json object. + + Parameters + ---------- + prev_events : `set` + A set of already processed events. + + tfr_client : `globus_sdk.TransferClient` + The transfer client to use to get the events. + + Returns + ------- + prev_events : `set` + The complete list of all event processed so far. + + json_events : `tuple` of `dict` + All the events with json bodies. + + message_events : `tuple` of `dict` + All the events with message bodies. + """ + + # Convert all the events into a (key, value) tuple pair + events = set(map(lambda x: tuple(x.data.items()), + tfr_client.task_event_list(task_id, None))) + # Drop all events we have seen before + new_events = events.difference(prev_events) + + json_events = set(filter(lambda x: dict(x).get("details", "").startswith("{"), new_events)) + message_events = tuple(map(dict, (new_events.difference(json_events)))) + + def json_loader(x): + x['details'] = json.loads(x['details']) + return x + + if json_events: + json_events = tuple(map(dict, map(json_loader, map(dict, json_events)))) + else: + json_events = ({},) + + return events, json_events, message_events + + +def _get_speed(event): + """ + A helper function to extract the speed from an event. + """ + if event.get('code', "").lower() == "progress" and isinstance(event['details'], dict): + return event['details'].get("mbps") + + +def watch_transfer_progress(task_id, tfr_client, total=None, poll_interval=5, verbose=False): + """ + """ + prev_events = set() + progress = tqdm(total=total, unit="file", + dynamic_ncols=True, + bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{rate_fmt}{postfix}]') + while True: + (prev_events, + json_events, + message_events) = _process_task_events(task_id, prev_events, tfr_client) + # Print status messages if verbose or if they are errors + for event in message_events: + if event['is_error'] or verbose: + progress.write(f"{event['code']}: {event['details']}") + + # Extract and calculate the transfer speed from the event list + speed = (list(map(_get_speed, json_events)) or [None])[0] + speed = f"{speed} Mb/s" if speed else "" + if speed: + progress.set_postfix_str(speed) + + # Get the status of the task to see how many files we have processed. + task = tfr_client.get_task(task_id) + status = task['status'] + progress.update((task['files_skipped'] + task['files_transferred']) - progress.n) + + # If the status of the task is not active we are finished. + if status != "ACTIVE": + progress.write(f"Task completed with {status} status.") + progress.close() + break + + # Wait for next poll + time.sleep(poll_interval) From 2ed4516c8fc65b52255831eb7e478e4d34b60d1d Mon Sep 17 00:00:00 2001 From: Stuart Mumford Date: Tue, 14 May 2019 11:59:02 +0100 Subject: [PATCH 2/6] Further progress bar tweaks and notebook support --- dkist/utils/__init__.py | 17 +++++++++++ dkist/utils/globus/transfer.py | 55 ++++++++++++++++++++++++++++++---- 2 files changed, 66 insertions(+), 6 deletions(-) diff --git a/dkist/utils/__init__.py b/dkist/utils/__init__.py index d3f4517a..028b50cd 100644 --- a/dkist/utils/__init__.py +++ b/dkist/utils/__init__.py @@ -2,3 +2,20 @@ # This sub-module is destined for common non-package specific utility # functions. + + +def in_notebook(): + """ + Attempts to detect if this python process is connected to a Jupyter Notebook. + """ + try: + import ipykernel.zmqshell + shell = get_ipython() # noqa + if isinstance(shell, ipykernel.zmqshell.ZMQInteractiveShell): + # Check that we can import the right widget + from tqdm import _tqdm_notebook + _tqdm_notebook.IntProgress + return True + return False + except Exception: + return False diff --git a/dkist/utils/globus/transfer.py b/dkist/utils/globus/transfer.py index 7161a84c..c0ca2c68 100644 --- a/dkist/utils/globus/transfer.py +++ b/dkist/utils/globus/transfer.py @@ -8,8 +8,9 @@ import datetime import globus_sdk -from tqdm import tqdm +from tqdm import tqdm, tqdm_notebook +from .. import in_notebook from .endpoints import auto_activate_endpoint, get_endpoint_id, get_transfer_client @@ -81,6 +82,7 @@ def start_transfer_from_file_list(src_endpoint, dst_endpoint, dst_base_path, fil return tc.submit_transfer(transfer_manifest)["task_id"] + def _process_task_events(task_id, prev_events, tfr_client): """ Process a globus task event list. @@ -115,17 +117,21 @@ def _process_task_events(task_id, prev_events, tfr_client): # Drop all events we have seen before new_events = events.difference(prev_events) + # Filter out the events which are json (start with {) json_events = set(filter(lambda x: dict(x).get("details", "").startswith("{"), new_events)) + # All the other events are message events message_events = tuple(map(dict, (new_events.difference(json_events)))) def json_loader(x): + """Modify the event so the json is a dict.""" x['details'] = json.loads(x['details']) return x + # If some of the events are json events, load the json. if json_events: json_events = tuple(map(dict, map(json_loader, map(dict, json_events)))) else: - json_events = ({},) + json_events = tuple() return events, json_events, message_events @@ -138,17 +144,53 @@ def _get_speed(event): return event['details'].get("mbps") -def watch_transfer_progress(task_id, tfr_client, total=None, poll_interval=5, verbose=False): +def get_progress_bar(*args, **kwargs): + """ + Return the correct tqdm instance. """ + notebook = in_notebook() + if not notebook: + kwargs['bar_format'] = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{rate_fmt}{postfix}]' + else: + # TODO: Both having this and not having it breaks things. + kwargs['total'] = kwargs.get("total", 1e9) + the_tqdm = tqdm if not in_notebook() else tqdm_notebook + return the_tqdm(*args, **kwargs) + + +def watch_transfer_progress(task_id, tfr_client, poll_interval=5, verbose=False): """ + Wait for a Globus transfer task to finish and display a progress bar. + + Parameters + ---------- + task_id : `str` + The task to monitor. + + tfr_client : `globus_sdk.TransferClient` + The transfer client to use to monitor the task. + + poll_interval : `int`, optional + The number of seconds to wait between API calls. + + verbose : `bool` + If `True` print all events received from Globus, defaults to `False` + which just prints Error events. + """ + started = False prev_events = set() - progress = tqdm(total=total, unit="file", - dynamic_ncols=True, - bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{rate_fmt}{postfix}]') + progress = None + progress = get_progress_bar(unit="file", + dynamic_ncols=True) while True: (prev_events, json_events, message_events) = _process_task_events(task_id, prev_events, tfr_client) + + if ('code', 'STARTED') not in prev_events and not started: + started = True + progress.write("PENDING: Starting Transfer") + # Print status messages if verbose or if they are errors for event in message_events: if event['is_error'] or verbose: @@ -163,6 +205,7 @@ def watch_transfer_progress(task_id, tfr_client, total=None, poll_interval=5, ve # Get the status of the task to see how many files we have processed. task = tfr_client.get_task(task_id) status = task['status'] + progress.total = task['files'] progress.update((task['files_skipped'] + task['files_transferred']) - progress.n) # If the status of the task is not active we are finished. From 731095dace136f415618429f1b7def430a689715 Mon Sep 17 00:00:00 2001 From: Stuart Mumford Date: Tue, 14 May 2019 15:51:21 +0100 Subject: [PATCH 3/6] Some cleanup --- dkist/utils/__init__.py | 2 +- dkist/utils/globus/__init__.py | 1 + dkist/utils/globus/transfer.py | 3 +++ 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/dkist/utils/__init__.py b/dkist/utils/__init__.py index 028b50cd..45a15e88 100644 --- a/dkist/utils/__init__.py +++ b/dkist/utils/__init__.py @@ -4,7 +4,7 @@ # functions. -def in_notebook(): +def in_notebook(): # pragma: no cover """ Attempts to detect if this python process is connected to a Jupyter Notebook. """ diff --git a/dkist/utils/globus/__init__.py b/dkist/utils/globus/__init__.py index 99485fb1..f596873d 100644 --- a/dkist/utils/globus/__init__.py +++ b/dkist/utils/globus/__init__.py @@ -3,3 +3,4 @@ """ from .auth import * from .endpoints import * +from .transfer import * diff --git a/dkist/utils/globus/transfer.py b/dkist/utils/globus/transfer.py index c0ca2c68..5b9d0664 100644 --- a/dkist/utils/globus/transfer.py +++ b/dkist/utils/globus/transfer.py @@ -14,6 +14,9 @@ from .endpoints import auto_activate_endpoint, get_endpoint_id, get_transfer_client +__all__ = ['watch_transfer_progress', 'start_transfer_from_file_list'] + + def start_transfer_from_file_list(src_endpoint, dst_endpoint, dst_base_path, file_list, src_base_path=None): """ From 82667751c6db52a81872020d7284066b563ac1f0 Mon Sep 17 00:00:00 2001 From: Stuart Mumford Date: Tue, 14 May 2019 15:51:37 +0100 Subject: [PATCH 4/6] First test for transfer --- dkist/utils/globus/tests/conftest.py | 16 ++++++++++ dkist/utils/globus/tests/test_endpoints.py | 10 ------- dkist/utils/globus/tests/test_transfer.py | 34 ++++++++++++++++++++++ 3 files changed, 50 insertions(+), 10 deletions(-) create mode 100644 dkist/utils/globus/tests/conftest.py create mode 100644 dkist/utils/globus/tests/test_transfer.py diff --git a/dkist/utils/globus/tests/conftest.py b/dkist/utils/globus/tests/conftest.py new file mode 100644 index 00000000..5fb8a6a6 --- /dev/null +++ b/dkist/utils/globus/tests/conftest.py @@ -0,0 +1,16 @@ +from unittest import mock +import pytest + +from dkist.utils.globus.endpoints import get_transfer_client + + +@pytest.fixture() +def transfer_client(mocker): + tc = get_transfer_client() + mocker.patch("dkist.utils.globus.endpoints.get_transfer_client", + return_value=tc) + mocker.patch("dkist.utils.globus.transfer.get_transfer_client", + return_value=tc) + mocker.patch("dkist.utils.globus.auth.get_refresh_token_authorizer", + return_value=None) + return tc diff --git a/dkist/utils/globus/tests/test_endpoints.py b/dkist/utils/globus/tests/test_endpoints.py index 10c5f96a..ce4c479e 100644 --- a/dkist/utils/globus/tests/test_endpoints.py +++ b/dkist/utils/globus/tests/test_endpoints.py @@ -31,16 +31,6 @@ def ls_response(mocker): return resp -@pytest.fixture -def transfer_client(mocker): - tc = get_transfer_client() - mocker.patch("dkist.utils.globus.endpoints.get_transfer_client", - return_value=tc) - mocker.patch("dkist.utils.globus.auth.get_refresh_token_authorizer", - return_value=None) - return tc - - @pytest.fixture def mock_search(mocker): mocker.patch("globus_sdk.TransferClient.endpoint_search", diff --git a/dkist/utils/globus/tests/test_transfer.py b/dkist/utils/globus/tests/test_transfer.py new file mode 100644 index 00000000..f793eef2 --- /dev/null +++ b/dkist/utils/globus/tests/test_transfer.py @@ -0,0 +1,34 @@ +import pathlib +from unittest import mock + +import pytest + +from dkist.utils.globus import start_transfer_from_file_list + + +@pytest.fixture +def mock_endpoints(mocker): + def id_mock(endpoint, tc): + return endpoint + mocker.patch("dkist.utils.globus.transfer.auto_activate_endpoint") + return mocker.patch("dkist.utils.globus.transfer.get_endpoint_id", + side_effect=id_mock) + + +def test_start_transfer(mocker, transfer_client, mock_endpoints): + submit_mock = mocker.patch("globus_sdk.TransferClient.submit_transfer", + return_value={"task_id": "task_id"}) + file_list = list(map(pathlib.Path, ["/a/name.fits", "/a/name2.fits"])) + start_transfer_from_file_list("a", "b", "/", file_list) + calls = mock_endpoints.call_args_list + assert calls[0][0][0] == "a" + assert calls[1][0][0] == "b" + + submit_mock.assert_called_once() + transfer_manifest = submit_mock.call_args_list[0][0][0]['DATA'] + + for filepath, tfr in zip(file_list, transfer_manifest): + assert str(filepath) == tfr['source_path'] + assert "/" + filepath.name == tfr['destination_path'] + + From 568a672b47dd8e9b86fd83abb996dbf8ae74dab5 Mon Sep 17 00:00:00 2001 From: Stuart Mumford Date: Thu, 16 May 2019 11:56:11 +0100 Subject: [PATCH 5/6] Add some tests for transfer --- dkist/utils/globus/tests/conftest.py | 4 + dkist/utils/globus/tests/test_transfer.py | 111 +++++++++++++++++++++- dkist/utils/globus/transfer.py | 4 +- 3 files changed, 116 insertions(+), 3 deletions(-) diff --git a/dkist/utils/globus/tests/conftest.py b/dkist/utils/globus/tests/conftest.py index 5fb8a6a6..e4874c61 100644 --- a/dkist/utils/globus/tests/conftest.py +++ b/dkist/utils/globus/tests/conftest.py @@ -6,7 +6,11 @@ @pytest.fixture() def transfer_client(mocker): + mocker.patch("globus_sdk.TransferClient.get_submission_id", + return_value={'value': "1234"}) + tc = get_transfer_client() + mocker.patch("dkist.utils.globus.endpoints.get_transfer_client", return_value=tc) mocker.patch("dkist.utils.globus.transfer.get_transfer_client", diff --git a/dkist/utils/globus/tests/test_transfer.py b/dkist/utils/globus/tests/test_transfer.py index f793eef2..2e47a7df 100644 --- a/dkist/utils/globus/tests/test_transfer.py +++ b/dkist/utils/globus/tests/test_transfer.py @@ -2,8 +2,10 @@ from unittest import mock import pytest +from globus_sdk import GlobusResponse -from dkist.utils.globus import start_transfer_from_file_list +from dkist.utils.globus.transfer import (_get_speed, _process_task_events, + start_transfer_from_file_list) @pytest.fixture @@ -15,6 +17,42 @@ def id_mock(endpoint, tc): side_effect=id_mock) +@pytest.fixture +def mock_task_event_list(mocker): + task_list = [ + GlobusResponse({ + 'DATA_TYPE': 'event', + 'code': 'STARTED', + 'description': 'started', + 'details': + '{\n "type": "GridFTP Transfer", \n "concurrency": 2, \n "protocol": "Mode S"\n}', + 'is_error': False, + 'parent_task_id': None, + 'time': '2019-05-16 10:13:26+00:00' + }), + GlobusResponse({ + 'DATA_TYPE': 'event', + 'code': 'SUCCEEDED', + 'description': 'succeeded', + 'details': 'Scanned 100 file(s)', + 'is_error': False, + 'parent_task_id': None, + 'time': '2019-05-16 10:13:24+00:00' + }), + GlobusResponse({ + 'DATA_TYPE': 'event', + 'code': 'STARTED', + 'description': 'started', + 'details': 'Starting sync scan', + 'is_error': False, + 'parent_task_id': None, + 'time': '2019-05-16 10:13:20+00:00' + }) + ] + return mocker.patch("globus_sdk.TransferClient.task_event_list", + return_value=task_list) + + def test_start_transfer(mocker, transfer_client, mock_endpoints): submit_mock = mocker.patch("globus_sdk.TransferClient.submit_transfer", return_value={"task_id": "task_id"}) @@ -32,3 +70,74 @@ def test_start_transfer(mocker, transfer_client, mock_endpoints): assert "/" + filepath.name == tfr['destination_path'] +def test_start_transfer_src_base(mocker, transfer_client, mock_endpoints): + submit_mock = mocker.patch("globus_sdk.TransferClient.submit_transfer", + return_value={"task_id": "task_id"}) + file_list = list(map(pathlib.Path, ["/a/b/name.fits", "/a/b/name2.fits"])) + start_transfer_from_file_list("a", "b", "/", file_list, "/a") + calls = mock_endpoints.call_args_list + assert calls[0][0][0] == "a" + assert calls[1][0][0] == "b" + + submit_mock.assert_called_once() + transfer_manifest = submit_mock.call_args_list[0][0][0]['DATA'] + + for filepath, tfr in zip(file_list, transfer_manifest): + assert str(filepath) == tfr['source_path'] + assert "/b/" + filepath.name == tfr['destination_path'] + + +def test_process_event_list(transfer_client, mock_task_event_list): + (events, + json_events, + message_events) = _process_task_events("1234", set(), transfer_client) + + assert isinstance(events, set) + assert all([isinstance(e, tuple) for e in events]) + assert all([all([isinstance(item, tuple) for item in e]) for e in events]) + + print(events) + assert len(json_events) == 1 + assert isinstance(json_events, tuple) + assert isinstance(json_events[0], dict) + assert isinstance(json_events[0]['details'], dict) + assert json_events[0]['code'] == 'STARTED' + + assert len(message_events) == 2 + assert isinstance(message_events, tuple) + assert isinstance(message_events[0], dict) + assert isinstance(message_events[0]['details'], str) + + +def test_process_event_list_message_only(transfer_client, mock_task_event_list): + # Filter out the json event + prev_events = tuple(map(lambda x: tuple(x.data.items()), + mock_task_event_list.return_value)) + prev_events = set(prev_events[0:1]) + + (events, + json_events, + message_events) = _process_task_events("1234", prev_events, transfer_client) + + assert isinstance(events, set) + assert all([isinstance(e, tuple) for e in events]) + assert all([all([isinstance(item, tuple) for item in e]) for e in events]) + + assert len(json_events) == 0 + assert isinstance(json_events, tuple) + + assert len(message_events) == 2 + assert isinstance(message_events, tuple) + assert isinstance(message_events[0], dict) + assert isinstance(message_events[0]['details'], str) + + +def test_get_speed(): + speed = _get_speed({'code': "PROGRESS", 'details': {'mbps': 10}}) + assert speed == 10 + speed = _get_speed({}) + assert speed is None + speed = _get_speed({'code': "progress", "details": "hello"}) + assert speed is None + speed = _get_speed({'code': "progress", "details": {"hello": "world"}}) + assert speed is None diff --git a/dkist/utils/globus/transfer.py b/dkist/utils/globus/transfer.py index 5b9d0664..f0205620 100644 --- a/dkist/utils/globus/transfer.py +++ b/dkist/utils/globus/transfer.py @@ -147,7 +147,7 @@ def _get_speed(event): return event['details'].get("mbps") -def get_progress_bar(*args, **kwargs): +def get_progress_bar(*args, **kwargs): # pragma: no cover """ Return the correct tqdm instance. """ @@ -161,7 +161,7 @@ def get_progress_bar(*args, **kwargs): return the_tqdm(*args, **kwargs) -def watch_transfer_progress(task_id, tfr_client, poll_interval=5, verbose=False): +def watch_transfer_progress(task_id, tfr_client, poll_interval=5, verbose=False): # pragma: no cover """ Wait for a Globus transfer task to finish and display a progress bar. From a75b8a6110f547b27116b8137afcb565146759af Mon Sep 17 00:00:00 2001 From: Stuart Mumford Date: Thu, 16 May 2019 11:57:53 +0100 Subject: [PATCH 6/6] Add changelog --- changelog/55.feature.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog/55.feature.rst diff --git a/changelog/55.feature.rst b/changelog/55.feature.rst new file mode 100644 index 00000000..2eb6162b --- /dev/null +++ b/changelog/55.feature.rst @@ -0,0 +1 @@ +Added support for starting and monitoring Globus transfer tasks