Skip to content

Commit

Permalink
Merge pull request #55 from Cadair/globus_transfer
Browse files Browse the repository at this point in the history
Add support for Globus transfer and monitoring
  • Loading branch information
Cadair authored May 16, 2019
2 parents 2d0a60d + a75b8a6 commit 2a176de
Show file tree
Hide file tree
Showing 8 changed files with 408 additions and 15 deletions.
1 change: 1 addition & 0 deletions changelog/55.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added support for starting and monitoring Globus transfer tasks
17 changes: 17 additions & 0 deletions dkist/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,20 @@

# This sub-module is destined for common non-package specific utility
# functions.


def in_notebook(): # pragma: no cover
"""
Attempts to detect if this python process is connected to a Jupyter Notebook.
"""
try:
import ipykernel.zmqshell
shell = get_ipython() # noqa
if isinstance(shell, ipykernel.zmqshell.ZMQInteractiveShell):
# Check that we can import the right widget
from tqdm import _tqdm_notebook
_tqdm_notebook.IntProgress
return True
return False
except Exception:
return False
1 change: 1 addition & 0 deletions dkist/utils/globus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
"""
from .auth import *
from .endpoints import *
from .transfer import *
10 changes: 5 additions & 5 deletions dkist/utils/globus/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,18 +103,18 @@ def get_endpoint_id(endpoint, tfr_client):


@ensure_globus_authorized
def auto_activate_endpoint(tfr_client, endpoint_id): # pragma: no cover
def auto_activate_endpoint(endpoint_id, tfr_client): # pragma: no cover
"""
Perform activation of a Globus endpoint.
Parameters
----------
tfr_client : `globus_sdk.TransferClient`
The transfer client to use for the activation.
endpoint_id : `str`
The uuid of the endpoint to activate.
tfr_client : `globus_sdk.TransferClient`
The transfer client to use for the activation.
"""
activation = tfr_client.endpoint_get_activation_requirements(endpoint_id)
needs_activation = bool(activation['DATA'])
Expand Down Expand Up @@ -160,7 +160,7 @@ def get_directory_listing(path, endpoint=None):

if endpoint_id is None:
endpoint_id = get_endpoint_id(endpoint, tc)
auto_activate_endpoint(tc, endpoint_id)
auto_activate_endpoint(endpoint_id, tc)

response = tc.operation_ls(endpoint_id, path=path.as_posix())
names = [r['name'] for r in response]
Expand Down
20 changes: 20 additions & 0 deletions dkist/utils/globus/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from unittest import mock
import pytest

from dkist.utils.globus.endpoints import get_transfer_client


@pytest.fixture()
def transfer_client(mocker):
mocker.patch("globus_sdk.TransferClient.get_submission_id",
return_value={'value': "1234"})

tc = get_transfer_client()

mocker.patch("dkist.utils.globus.endpoints.get_transfer_client",
return_value=tc)
mocker.patch("dkist.utils.globus.transfer.get_transfer_client",
return_value=tc)
mocker.patch("dkist.utils.globus.auth.get_refresh_token_authorizer",
return_value=None)
return tc
10 changes: 0 additions & 10 deletions dkist/utils/globus/tests/test_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,6 @@ def ls_response(mocker):
return resp


@pytest.fixture
def transfer_client(mocker):
tc = get_transfer_client()
mocker.patch("dkist.utils.globus.endpoints.get_transfer_client",
return_value=tc)
mocker.patch("dkist.utils.globus.auth.get_refresh_token_authorizer",
return_value=None)
return tc


@pytest.fixture
def mock_search(mocker):
mocker.patch("globus_sdk.TransferClient.endpoint_search",
Expand Down
143 changes: 143 additions & 0 deletions dkist/utils/globus/tests/test_transfer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import pathlib
from unittest import mock

import pytest
from globus_sdk import GlobusResponse

from dkist.utils.globus.transfer import (_get_speed, _process_task_events,
start_transfer_from_file_list)


@pytest.fixture
def mock_endpoints(mocker):
def id_mock(endpoint, tc):
return endpoint
mocker.patch("dkist.utils.globus.transfer.auto_activate_endpoint")
return mocker.patch("dkist.utils.globus.transfer.get_endpoint_id",
side_effect=id_mock)


@pytest.fixture
def mock_task_event_list(mocker):
task_list = [
GlobusResponse({
'DATA_TYPE': 'event',
'code': 'STARTED',
'description': 'started',
'details':
'{\n "type": "GridFTP Transfer", \n "concurrency": 2, \n "protocol": "Mode S"\n}',
'is_error': False,
'parent_task_id': None,
'time': '2019-05-16 10:13:26+00:00'
}),
GlobusResponse({
'DATA_TYPE': 'event',
'code': 'SUCCEEDED',
'description': 'succeeded',
'details': 'Scanned 100 file(s)',
'is_error': False,
'parent_task_id': None,
'time': '2019-05-16 10:13:24+00:00'
}),
GlobusResponse({
'DATA_TYPE': 'event',
'code': 'STARTED',
'description': 'started',
'details': 'Starting sync scan',
'is_error': False,
'parent_task_id': None,
'time': '2019-05-16 10:13:20+00:00'
})
]
return mocker.patch("globus_sdk.TransferClient.task_event_list",
return_value=task_list)


def test_start_transfer(mocker, transfer_client, mock_endpoints):
submit_mock = mocker.patch("globus_sdk.TransferClient.submit_transfer",
return_value={"task_id": "task_id"})
file_list = list(map(pathlib.Path, ["/a/name.fits", "/a/name2.fits"]))
start_transfer_from_file_list("a", "b", "/", file_list)
calls = mock_endpoints.call_args_list
assert calls[0][0][0] == "a"
assert calls[1][0][0] == "b"

submit_mock.assert_called_once()
transfer_manifest = submit_mock.call_args_list[0][0][0]['DATA']

for filepath, tfr in zip(file_list, transfer_manifest):
assert str(filepath) == tfr['source_path']
assert "/" + filepath.name == tfr['destination_path']


def test_start_transfer_src_base(mocker, transfer_client, mock_endpoints):
submit_mock = mocker.patch("globus_sdk.TransferClient.submit_transfer",
return_value={"task_id": "task_id"})
file_list = list(map(pathlib.Path, ["/a/b/name.fits", "/a/b/name2.fits"]))
start_transfer_from_file_list("a", "b", "/", file_list, "/a")
calls = mock_endpoints.call_args_list
assert calls[0][0][0] == "a"
assert calls[1][0][0] == "b"

submit_mock.assert_called_once()
transfer_manifest = submit_mock.call_args_list[0][0][0]['DATA']

for filepath, tfr in zip(file_list, transfer_manifest):
assert str(filepath) == tfr['source_path']
assert "/b/" + filepath.name == tfr['destination_path']


def test_process_event_list(transfer_client, mock_task_event_list):
(events,
json_events,
message_events) = _process_task_events("1234", set(), transfer_client)

assert isinstance(events, set)
assert all([isinstance(e, tuple) for e in events])
assert all([all([isinstance(item, tuple) for item in e]) for e in events])

print(events)
assert len(json_events) == 1
assert isinstance(json_events, tuple)
assert isinstance(json_events[0], dict)
assert isinstance(json_events[0]['details'], dict)
assert json_events[0]['code'] == 'STARTED'

assert len(message_events) == 2
assert isinstance(message_events, tuple)
assert isinstance(message_events[0], dict)
assert isinstance(message_events[0]['details'], str)


def test_process_event_list_message_only(transfer_client, mock_task_event_list):
# Filter out the json event
prev_events = tuple(map(lambda x: tuple(x.data.items()),
mock_task_event_list.return_value))
prev_events = set(prev_events[0:1])

(events,
json_events,
message_events) = _process_task_events("1234", prev_events, transfer_client)

assert isinstance(events, set)
assert all([isinstance(e, tuple) for e in events])
assert all([all([isinstance(item, tuple) for item in e]) for e in events])

assert len(json_events) == 0
assert isinstance(json_events, tuple)

assert len(message_events) == 2
assert isinstance(message_events, tuple)
assert isinstance(message_events[0], dict)
assert isinstance(message_events[0]['details'], str)


def test_get_speed():
speed = _get_speed({'code': "PROGRESS", 'details': {'mbps': 10}})
assert speed == 10
speed = _get_speed({})
assert speed is None
speed = _get_speed({'code': "progress", "details": "hello"})
assert speed is None
speed = _get_speed({'code': "progress", "details": {"hello": "world"}})
assert speed is None
Loading

0 comments on commit 2a176de

Please sign in to comment.