Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Globus transfer and monitoring #55

Merged
merged 6 commits into from
May 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/55.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added support for starting and monitoring Globus transfer tasks
17 changes: 17 additions & 0 deletions dkist/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,20 @@

# This sub-module is destined for common non-package specific utility
# functions.


def in_notebook(): # pragma: no cover
"""
Attempts to detect if this python process is connected to a Jupyter Notebook.
"""
try:
import ipykernel.zmqshell
shell = get_ipython() # noqa
if isinstance(shell, ipykernel.zmqshell.ZMQInteractiveShell):
# Check that we can import the right widget
from tqdm import _tqdm_notebook
_tqdm_notebook.IntProgress
return True
return False
except Exception:
return False
1 change: 1 addition & 0 deletions dkist/utils/globus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
"""
from .auth import *
from .endpoints import *
from .transfer import *
10 changes: 5 additions & 5 deletions dkist/utils/globus/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,18 +103,18 @@ def get_endpoint_id(endpoint, tfr_client):


@ensure_globus_authorized
def auto_activate_endpoint(tfr_client, endpoint_id): # pragma: no cover
def auto_activate_endpoint(endpoint_id, tfr_client): # pragma: no cover
"""
Perform activation of a Globus endpoint.
Parameters
----------
tfr_client : `globus_sdk.TransferClient`
The transfer client to use for the activation.
endpoint_id : `str`
The uuid of the endpoint to activate.
tfr_client : `globus_sdk.TransferClient`
The transfer client to use for the activation.
"""
activation = tfr_client.endpoint_get_activation_requirements(endpoint_id)
needs_activation = bool(activation['DATA'])
Expand Down Expand Up @@ -160,7 +160,7 @@ def get_directory_listing(path, endpoint=None):

if endpoint_id is None:
endpoint_id = get_endpoint_id(endpoint, tc)
auto_activate_endpoint(tc, endpoint_id)
auto_activate_endpoint(endpoint_id, tc)

response = tc.operation_ls(endpoint_id, path=path.as_posix())
names = [r['name'] for r in response]
Expand Down
20 changes: 20 additions & 0 deletions dkist/utils/globus/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from unittest import mock
import pytest

from dkist.utils.globus.endpoints import get_transfer_client


@pytest.fixture()
def transfer_client(mocker):
mocker.patch("globus_sdk.TransferClient.get_submission_id",
return_value={'value': "1234"})

tc = get_transfer_client()

mocker.patch("dkist.utils.globus.endpoints.get_transfer_client",
return_value=tc)
mocker.patch("dkist.utils.globus.transfer.get_transfer_client",
return_value=tc)
mocker.patch("dkist.utils.globus.auth.get_refresh_token_authorizer",
return_value=None)
return tc
10 changes: 0 additions & 10 deletions dkist/utils/globus/tests/test_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,6 @@ def ls_response(mocker):
return resp


@pytest.fixture
def transfer_client(mocker):
tc = get_transfer_client()
mocker.patch("dkist.utils.globus.endpoints.get_transfer_client",
return_value=tc)
mocker.patch("dkist.utils.globus.auth.get_refresh_token_authorizer",
return_value=None)
return tc


@pytest.fixture
def mock_search(mocker):
mocker.patch("globus_sdk.TransferClient.endpoint_search",
Expand Down
143 changes: 143 additions & 0 deletions dkist/utils/globus/tests/test_transfer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import pathlib
from unittest import mock

import pytest
from globus_sdk import GlobusResponse

from dkist.utils.globus.transfer import (_get_speed, _process_task_events,
start_transfer_from_file_list)


@pytest.fixture
def mock_endpoints(mocker):
def id_mock(endpoint, tc):
return endpoint
mocker.patch("dkist.utils.globus.transfer.auto_activate_endpoint")
return mocker.patch("dkist.utils.globus.transfer.get_endpoint_id",
side_effect=id_mock)


@pytest.fixture
def mock_task_event_list(mocker):
task_list = [
GlobusResponse({
'DATA_TYPE': 'event',
'code': 'STARTED',
'description': 'started',
'details':
'{\n "type": "GridFTP Transfer", \n "concurrency": 2, \n "protocol": "Mode S"\n}',
'is_error': False,
'parent_task_id': None,
'time': '2019-05-16 10:13:26+00:00'
}),
GlobusResponse({
'DATA_TYPE': 'event',
'code': 'SUCCEEDED',
'description': 'succeeded',
'details': 'Scanned 100 file(s)',
'is_error': False,
'parent_task_id': None,
'time': '2019-05-16 10:13:24+00:00'
}),
GlobusResponse({
'DATA_TYPE': 'event',
'code': 'STARTED',
'description': 'started',
'details': 'Starting sync scan',
'is_error': False,
'parent_task_id': None,
'time': '2019-05-16 10:13:20+00:00'
})
]
return mocker.patch("globus_sdk.TransferClient.task_event_list",
return_value=task_list)


def test_start_transfer(mocker, transfer_client, mock_endpoints):
submit_mock = mocker.patch("globus_sdk.TransferClient.submit_transfer",
return_value={"task_id": "task_id"})
file_list = list(map(pathlib.Path, ["/a/name.fits", "/a/name2.fits"]))
start_transfer_from_file_list("a", "b", "/", file_list)
calls = mock_endpoints.call_args_list
assert calls[0][0][0] == "a"
assert calls[1][0][0] == "b"

submit_mock.assert_called_once()
transfer_manifest = submit_mock.call_args_list[0][0][0]['DATA']

for filepath, tfr in zip(file_list, transfer_manifest):
assert str(filepath) == tfr['source_path']
assert "/" + filepath.name == tfr['destination_path']


def test_start_transfer_src_base(mocker, transfer_client, mock_endpoints):
submit_mock = mocker.patch("globus_sdk.TransferClient.submit_transfer",
return_value={"task_id": "task_id"})
file_list = list(map(pathlib.Path, ["/a/b/name.fits", "/a/b/name2.fits"]))
start_transfer_from_file_list("a", "b", "/", file_list, "/a")
calls = mock_endpoints.call_args_list
assert calls[0][0][0] == "a"
assert calls[1][0][0] == "b"

submit_mock.assert_called_once()
transfer_manifest = submit_mock.call_args_list[0][0][0]['DATA']

for filepath, tfr in zip(file_list, transfer_manifest):
assert str(filepath) == tfr['source_path']
assert "/b/" + filepath.name == tfr['destination_path']


def test_process_event_list(transfer_client, mock_task_event_list):
(events,
json_events,
message_events) = _process_task_events("1234", set(), transfer_client)

assert isinstance(events, set)
assert all([isinstance(e, tuple) for e in events])
assert all([all([isinstance(item, tuple) for item in e]) for e in events])

print(events)
assert len(json_events) == 1
assert isinstance(json_events, tuple)
assert isinstance(json_events[0], dict)
assert isinstance(json_events[0]['details'], dict)
assert json_events[0]['code'] == 'STARTED'

assert len(message_events) == 2
assert isinstance(message_events, tuple)
assert isinstance(message_events[0], dict)
assert isinstance(message_events[0]['details'], str)


def test_process_event_list_message_only(transfer_client, mock_task_event_list):
# Filter out the json event
prev_events = tuple(map(lambda x: tuple(x.data.items()),
mock_task_event_list.return_value))
prev_events = set(prev_events[0:1])

(events,
json_events,
message_events) = _process_task_events("1234", prev_events, transfer_client)

assert isinstance(events, set)
assert all([isinstance(e, tuple) for e in events])
assert all([all([isinstance(item, tuple) for item in e]) for e in events])

assert len(json_events) == 0
assert isinstance(json_events, tuple)

assert len(message_events) == 2
assert isinstance(message_events, tuple)
assert isinstance(message_events[0], dict)
assert isinstance(message_events[0]['details'], str)


def test_get_speed():
speed = _get_speed({'code': "PROGRESS", 'details': {'mbps': 10}})
assert speed == 10
speed = _get_speed({})
assert speed is None
speed = _get_speed({'code': "progress", "details": "hello"})
assert speed is None
speed = _get_speed({'code': "progress", "details": {"hello": "world"}})
assert speed is None
Loading