Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable remote draining capability of Drones via REST interface #260

Merged
merged 12 commits into from
Aug 30, 2022
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ celerybeat-schedule

# virtualenv
.venv
venv/
venv*/
ENV/

# Spyder project settings
Expand Down
7 changes: 4 additions & 3 deletions CONTRIBUTORS
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ Contributors ordered by number of commits:
==========================================
Manuel Giffels <[email protected]>
Max Fischer <[email protected]>
Alexander Haas <[email protected]>
Stefan Kroboth <[email protected]>
Eileen Kuehn <[email protected]>
matthias.schnepf <[email protected]>
Expand All @@ -10,11 +11,11 @@ Rene Caspart <[email protected]>
Leon Schuhmacher <[email protected]>
R. Florian von Cube <[email protected]>
mschnepf <[email protected]>
Alexander Haas <[email protected]>
mschnepf <[email protected]>
Matthias Schnepf <[email protected]>
Matthias J. Schnepf <[email protected]>
Matthias Schnepf <[email protected]>
Matthias Schnepf <[email protected]>
PSchuhmacher <[email protected]>
Peter Wienemann <[email protected]>
rfvc <[email protected]>
PSchuhmacher <[email protected]>
Alexander Haas <[email protected]>
7 changes: 4 additions & 3 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
.. Created by changelog.py at 2022-07-27, command
'/Users/giffler/.cache/pre-commit/repor6pnmwlm/py_env-default/bin/changelog docs/source/changes compile --output=docs/source/changelog.rst'
.. Created by changelog.py at 2022-08-29, command
'/Users/giffler/.cache/pre-commit/repor6pnmwlm/py_env-python3.10/bin/changelog docs/source/changes compile --output=docs/source/changelog.rst'
based on the format of 'https://keepachangelog.com/'

#########
CHANGELOG
#########

[Unreleased] - 2022-07-27
[Unreleased] - 2022-08-29
=========================

Added
-----

* Introduce a TARDIS REST API to query the state of resources from SqlRegistry
* Added support for manual draining of drones using the REST API
* Add support for passing environment variables as executable arguments to support HTCondor grid universe

Changed
Expand Down
8 changes: 8 additions & 0 deletions docs/source/changes/260.add_remote_drone_draining.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
category: added
summary: "Added support for manual draining of drones using the REST API"
description: |
Added limited support to synchronise the state stored in the ``SqliteRegistry`` with the current state of the drone.
RHofsaess marked this conversation as resolved.
Show resolved Hide resolved
Only implemented for drones in ``AvailableState`` which can transition to ``DrainState`` via a remote update of the
``SqliteRegistry`` i.e. using the REST API.
RHofsaess marked this conversation as resolved.
Show resolved Hide resolved
pull requests:
- 260
9 changes: 9 additions & 0 deletions tardis/plugins/sqliteregistry.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,15 @@ def execute(self, sql_query: str, bind_parameters: Dict) -> List[Dict]:
logger.debug(f"{sql_query},{bind_parameters} executed")
return cursor.fetchall()

async def get_resource_state(self, drone_uuid: str):
sql_query = """
SELECT RS.state
FROM Resources R
JOIN ResourceStates RS ON R.state_id = RS.state_id
WHERE R.drone_uuid = :drone_uuid
"""
return await self.async_execute(sql_query, {"drone_uuid": drone_uuid})

def get_resources(self, site_name: str, machine_type: str) -> List[Dict]:
sql_query = """
SELECT R.remote_resource_uuid, R.drone_uuid, RS.state, R.created, R.updated
Expand Down
21 changes: 20 additions & 1 deletion tardis/resources/drone.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
from typing import List, Union, Optional
from typing import List, Union, Optional, Type

from tardis.agents.batchsystemagent import BatchSystemAgent
from tardis.agents.siteagent import SiteAgent
from tardis.interfaces.plugin import Plugin
from tardis.interfaces.state import State
from .dronestates import RequestState
from .dronestates import DownState
from ..plugins.sqliteregistry import SqliteRegistry
from ..utilities.attributedict import AttributeDict
from ..utilities.utils import load_states
from cobald.daemon import service
from cobald.interfaces import Pool

from backports.cached_property import cached_property
from datetime import datetime

import asyncio
Expand Down Expand Up @@ -60,6 +63,22 @@ def allocation(self) -> float:
def batch_system_agent(self) -> BatchSystemAgent:
return self._batch_system_agent

@cached_property
def _database(self) -> Optional[SqliteRegistry]:
for plugin in self._plugins:
if isinstance(plugin, SqliteRegistry):
return plugin

async def database_state(self) -> Optional[Type[State]]:
try:
return load_states(
await self._database.get_resource_state(
self.resource_attributes.drone_uuid
)
)[0]["state"]
except (IndexError, AttributeError):
return None

@property
def demand(self) -> float:
return self._demand
Expand Down
11 changes: 11 additions & 0 deletions tardis/resources/dronestates.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ async def batchsystem_machine_status(
return state_transition[machine_status]()


async def check_remote_draining(
state_transition, drone: "Drone", current_state: Type[State]
):
database_state = await drone.database_state()

if database_state is DrainState and database_state is not current_state:
raise StopProcessing(last_result=database_state())
return state_transition


async def check_demand(state_transition, drone: "Drone", current_state: Type[State]):
if not drone.demand:
drone._supply = 0.0
Expand Down Expand Up @@ -154,6 +164,7 @@ class AvailableState(State):
}

processing_pipeline = [
check_remote_draining,
check_demand,
check_minimum_lifetime,
resource_status,
Expand Down
12 changes: 2 additions & 10 deletions tardis/resources/poolfactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from ..agents.siteagent import SiteAgent
from ..configuration.configuration import Configuration
from ..resources.drone import Drone
from ..utilities.utils import load_states

from cobald.composite.weighted import WeightedComposite
from cobald.composite.factory import FactoryPool
Expand All @@ -17,15 +18,6 @@
from importlib import import_module


def str_to_state(resources):
for entry in resources:
state_class = getattr(
import_module(name="tardis.resources.dronestates"), f"{entry['state']}"
)
entry["state"] = state_class()
return resources


def create_composite_pool(configuration: str = None) -> WeightedComposite:
configuration = Configuration(configuration)

Expand Down Expand Up @@ -117,7 +109,7 @@ def get_drones_to_restore(plugins: dict, site, machine_type: str):
except KeyError:
return []
else:
return str_to_state(
return load_states(
sql_registry.get_resources(site_name=site.name, machine_type=machine_type)
)

Expand Down
9 changes: 9 additions & 0 deletions tardis/utilities/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@ def machine_meta_data_translation(
raise


def load_states(resources):
import tardis.resources.dronestates

for entry in resources:
state_class = getattr(tardis.resources.dronestates, str(entry["state"]))
entry["state"] = state_class()
return resources


def submit_cmd_option_formatter(options: AttributeDict) -> str:
option_prefix = dict(short="-", long="--")
option_separator = dict(short=" ", long="=")
Expand Down
22 changes: 22 additions & 0 deletions tests/plugins_t/test_sqliteregistry.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,28 @@ def test_double_schema_deployment(self):
SqliteRegistry()
SqliteRegistry()

@patch("tardis.plugins.sqliteregistry.logging", Mock())
def test_get_resource_state(self):
self.registry.add_site(self.test_site_name)
self.registry.add_machine_types(self.test_site_name, self.test_machine_type)
run_async(self.registry.notify, RequestState(), self.test_resource_attributes)

self.assertEqual(
run_async(
self.registry.get_resource_state,
drone_uuid=self.test_resource_attributes["drone_uuid"],
),
[{"state": "RequestState"}],
)

self.assertEqual(
run_async(
self.registry.get_resource_state,
drone_uuid="does_not_exists",
),
[],
)

@patch("tardis.plugins.sqliteregistry.logging", Mock())
def test_get_resources(self):
self.registry.add_site(self.test_site_name)
Expand Down
34 changes: 32 additions & 2 deletions tests/resources_t/test_drone.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from ..utilities.utilities import async_return, run_async
from ..utilities.utilities import async_return, run_async, set_awaitable_return_value

from tardis.interfaces.plugin import Plugin
from tardis.interfaces.state import State
from tardis.resources.drone import Drone
from tardis.resources.dronestates import DownState
from tardis.resources.dronestates import DrainState, DownState
from tardis.plugins.sqliteregistry import SqliteRegistry
from tardis.utilities.attributedict import AttributeDict

from logging import DEBUG
Expand Down Expand Up @@ -49,6 +50,35 @@ def test_allocation(self):
def test_batch_system_agent(self):
self.assertEqual(self.drone.batch_system_agent, self.mock_batch_system_agent)

def test_database(self):
self.assertIsNone(self.drone._database)

sql_registry = MagicMock(spec=SqliteRegistry)
self.drone.register_plugins(sql_registry)
self.drone.__dict__.pop("_database") # reset cached_property's cache

self.assertEqual(self.drone._database, sql_registry)

def test_database_state(self):
self.assertIsNone(run_async(self.drone.database_state))

sql_registry = MagicMock(spec=SqliteRegistry)
self.drone.register_plugins(sql_registry)
self.drone.__dict__.pop("_database") # reset cached_property's cache
set_awaitable_return_value(
sql_registry.get_resource_state, [{"state": "DrainState"}]
)

self.assertIsInstance(run_async(self.drone.database_state), DrainState)

# testing IndexError
set_awaitable_return_value(sql_registry.get_resource_state, [])
self.assertIsNone(run_async(self.drone.database_state))

# testing AttributeError
delattr(self.drone.resource_attributes, "drone_uuid")
self.assertIsNone(run_async(self.drone.database_state))

def test_demand(self):
self.assertEqual(self.drone.demand, 8)
self.drone.demand = 0
Expand Down
8 changes: 8 additions & 0 deletions tests/resources_t/test_dronestates.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ def test_integrating_state(self):
self.run_the_matrix(matrix, initial_state=IntegratingState)

def test_available_state(self):
self.drone.database_state.return_value = async_return()

matrix = [
(ResourceStatus.Running, MachineStatus.Available, AvailableState),
(ResourceStatus.Running, MachineStatus.NotAvailable, ShutDownState),
Expand Down Expand Up @@ -223,6 +225,12 @@ def test_available_state(self):
run_async(self.drone.state.return_value.run, self.drone)
self.assertIsInstance(self.drone.state, DrainState)

# Test remote draining procedure via REST service and database
self.drone.database_state.return_value = async_return(return_value=DrainState)
self.drone.state.return_value = AvailableState()
run_async(self.drone.state.return_value.run, self.drone)
self.assertIsInstance(self.drone.state, DrainState)

def test_drain_state(self):
self.drone.state.return_value = DrainState
run_async(self.drone.state.return_value.run, self.drone)
Expand Down
7 changes: 0 additions & 7 deletions tests/resources_t/test_poolfactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from tardis.resources.poolfactory import create_drone
from tardis.resources.poolfactory import get_drones_to_restore
from tardis.resources.poolfactory import load_plugins
from tardis.resources.poolfactory import str_to_state
from tardis.utilities.attributedict import AttributeDict

from unittest import TestCase
Expand Down Expand Up @@ -44,12 +43,6 @@ def setUp(self):
sqlite_registry = self.mock_sqliteregistry.return_value
sqlite_registry.get_resources.return_value = [{"state": "RequestState"}]

def test_str_to_state(self):
test = [{"state": "RequestState", "drone_uuid": "test-abc123"}]
converted_test = str_to_state(test)
self.assertTrue(converted_test[0]["state"], RequestState)
self.assertEqual(converted_test[0]["drone_uuid"], "test-abc123")

@patch("tardis.resources.poolfactory.FactoryPool")
@patch("tardis.resources.poolfactory.Logger")
@patch("tardis.resources.poolfactory.Standardiser")
Expand Down
11 changes: 11 additions & 0 deletions tests/utilities/utilities.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from tardis.utilities.attributedict import AttributeDict

import asyncio
import socket

Expand Down Expand Up @@ -39,3 +40,13 @@ def wrapper(self):
def run_async(coroutine, *args, **kwargs):
loop = asyncio.get_event_loop_policy().get_event_loop()
return loop.run_until_complete(coroutine(*args, **kwargs))


def set_awaitable_return_value(mocked_coroutine, return_value):
# isinstance does not work due to inheritance
# iswaitable, iscoroutine not because RuntimeWarning (not awaited)
# comparing type(...) did not solve the problem as well.
if not mocked_coroutine.__class__.__name__ == "MagicMock":
mocked_coroutine.return_value = return_value
else: # pass test on Python 3.6 and 3.7
mocked_coroutine.return_value = async_return(return_value=return_value)
10 changes: 10 additions & 0 deletions tests/utilities_t/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import logging

from tardis.resources.dronestates import RequestState
from tardis.utilities.attributedict import AttributeDict
from tardis.utilities.utils import (
csv_parser,
disable_logging,
htcondor_cmd_option_formatter,
load_states,
submit_cmd_option_formatter,
)

Expand Down Expand Up @@ -116,6 +118,14 @@ def test_disable_logging(self):
logging.debug("Test")


class TestStrToState(TestCase):
def test_str_to_state(self):
test = [{"state": "RequestState", "drone_uuid": "test-abc123"}]
converted_test = load_states(test)
self.assertTrue(converted_test[0]["state"], RequestState)
self.assertEqual(converted_test[0]["drone_uuid"], "test-abc123")


class TestSlurmCMDOptionFormatter(TestCase):
def test_submit_cmd_option_formatter(self):
options = AttributeDict()
Expand Down