From 04952f3581040768b52ca785b8f8403c4529e3b8 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Thu, 16 Nov 2023 14:13:48 +0100 Subject: [PATCH 01/20] add extra redis DB --- packages/service-library/src/servicelib/redis.py | 3 --- packages/settings-library/src/settings_library/redis.py | 1 + services/docker-compose-ops.yml | 2 +- services/docker-compose.yml | 2 +- 4 files changed, 3 insertions(+), 5 deletions(-) diff --git a/packages/service-library/src/servicelib/redis.py b/packages/service-library/src/servicelib/redis.py index 32b87149c5b..2cb393e0dff 100644 --- a/packages/service-library/src/servicelib/redis.py +++ b/packages/service-library/src/servicelib/redis.py @@ -21,9 +21,6 @@ from .logging_utils import log_catch, log_context _DEFAULT_LOCK_TTL: Final[datetime.timedelta] = datetime.timedelta(seconds=10) -_CANCEL_TASK_TIMEOUT: Final[datetime.timedelta] = datetime.timedelta(seconds=0.1) -_MINUTE: Final[NonNegativeFloat] = 60 -_WAIT_SECS: Final[NonNegativeFloat] = 1 logger = logging.getLogger(__name__) diff --git a/packages/settings-library/src/settings_library/redis.py b/packages/settings-library/src/settings_library/redis.py index e973630cb02..30a95ce098f 100644 --- a/packages/settings-library/src/settings_library/redis.py +++ b/packages/settings-library/src/settings_library/redis.py @@ -15,6 +15,7 @@ class RedisDatabase(int, Enum): SCHEDULED_MAINTENANCE = 3 USER_NOTIFICATIONS = 4 ANNOUNCEMENTS = 5 + DISTRIBUTED_IDENTIFIERS = 6 class RedisSettings(BaseCustomSettings): diff --git a/services/docker-compose-ops.yml b/services/docker-compose-ops.yml index f2651e92a63..57ba3575f14 100644 --- a/services/docker-compose-ops.yml +++ b/services/docker-compose-ops.yml @@ -86,7 +86,7 @@ services: image: rediscommander/redis-commander:latest init: true environment: - - REDIS_HOSTS=resources:${REDIS_HOST}:${REDIS_PORT}:0,locks:${REDIS_HOST}:${REDIS_PORT}:1,validation_codes:${REDIS_HOST}:${REDIS_PORT}:2,scheduled_maintenance:${REDIS_HOST}:${REDIS_PORT}:3,user_notifications:${REDIS_HOST}:${REDIS_PORT}:4,announcements:${REDIS_HOST}:${REDIS_PORT}:5 + - REDIS_HOSTS=resources:${REDIS_HOST}:${REDIS_PORT}:0,locks:${REDIS_HOST}:${REDIS_PORT}:1,validation_codes:${REDIS_HOST}:${REDIS_PORT}:2,scheduled_maintenance:${REDIS_HOST}:${REDIS_PORT}:3,user_notifications:${REDIS_HOST}:${REDIS_PORT}:4,announcements:${REDIS_HOST}:${REDIS_PORT}:5,distributed_identifiers:${REDIS_HOST}:${REDIS_PORT}:6 # If you add/remove a db, do not forget to update the --databases entry in the docker-compose.yml ports: - "18081:8081" diff --git a/services/docker-compose.yml b/services/docker-compose.yml index e8f63bbaaf7..e4c4510fcfd 100644 --- a/services/docker-compose.yml +++ b/services/docker-compose.yml @@ -920,7 +920,7 @@ services: "--loglevel", "verbose", "--databases", - "6", + "7", "--appendonly", "yes" ] From 6be44563463be9d6b00c834678b94c0a4beb3b31 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Thu, 16 Nov 2023 15:57:45 +0100 Subject: [PATCH 02/20] refactored distributed identifier --- .../utils/distributed_identifer.py | 236 ++++++++++++-- .../unit/test_utils_distributed_identifier.py | 294 +++++++++++++++--- 2 files changed, 469 insertions(+), 61 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/utils/distributed_identifer.py b/services/director-v2/src/simcore_service_director_v2/utils/distributed_identifer.py index 05d84a72f08..2659d1d56f9 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/distributed_identifer.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/distributed_identifer.py @@ -1,41 +1,98 @@ import logging from abc import ABC, abstractmethod -from typing import Generic, TypeVar +from asyncio import Task +from datetime import timedelta +from typing import Final, Generic, TypeVar +from pydantic import NonNegativeInt +from servicelib.background_task import start_periodic_task, stop_periodic_task from servicelib.logging_utils import log_catch, log_context +from servicelib.redis import RedisClientSDK, RedisDatabase +from servicelib.utils import logged_gather _logger = logging.getLogger(__name__) +_REDIS_MAX_CONCURRENCY: Final[NonNegativeInt] = 10 +_DEFAULT_CLEANUP_INTERVAL: Final[timedelta] = timedelta(minutes=1) + Ident = TypeVar("Ident") Res = TypeVar("Res") +# Provided at the moment of creation. +# Can be used inside ``is_present`` and ``remove``. +CleanupContext = TypeVar("CleanupContext") -class BaseDistributedIdentifierManager(ABC, Generic[Ident, Res]): - """Common interface used to manage the lifecycle of osparc resources. - An osparc resource can be anything that needs to be created and then removed - during the runtime of the platform. +class BaseDistributedIdentifierManager(ABC, Generic[Ident, Res, CleanupContext]): + # TODO: add docstring here - Safe remove the resource. + @classmethod + @abstractmethod + def _deserialize_identifier(cls, raw: str) -> Ident: + """User provided deserialization for the identifier - For usage check ``packages/service-library/tests/test_osparc_generic_resource.py`` - """ + Arguments: + raw -- stream to be deserialized + + Returns: + an identifier object + """ + @classmethod @abstractmethod - async def get(self, identifier: Ident, **extra_kwargs) -> Res | None: - """Returns a resource if exists. + def _serialize_identifier(cls, identifier: Ident) -> str: + """User provided serialization for the identifier + + Arguments: + cleanup_context -- user defined identifier object + + Returns: + object encoded as string + """ + + @classmethod + @abstractmethod + def _deserialize_cleanup_context(cls, raw: str) -> CleanupContext: + """User provided deserialization for the context + + Arguments: + raw -- stream to be deserialized + + Returns: + an object of the type chosen by the user + """ + + @classmethod + @abstractmethod + def _serialize_cleanup_context(cls, cleanup_context: CleanupContext) -> str: + """User provided serialization for the context + + Arguments: + cleanup_context -- user defined cleanup context object + + Returns: + object encoded as string + """ + + @abstractmethod + async def is_used(self, identifier: Ident, cleanup_context: CleanupContext) -> bool: + """Check if the resource associated to the ``identifier`` is + still being used. + # NOTE: a resource can be created but not in use. Arguments: identifier -- user chosen identifier for the resource - **extra_kwargs -- can be overloaded by the user + cleanup_context -- user defined CleanupContext object Returns: - None if the resource does not exit + True if ``identifier`` is still being used """ @abstractmethod - async def create(self, **extra_kwargs) -> tuple[Ident, Res]: - """Used for creating the resources + async def _create(self, **extra_kwargs) -> tuple[Ident, Res]: + """Used INTERNALLY for creating the resources. + # NOTE: should not be used directly, use the public + version ``create`` instead. Arguments: **extra_kwargs -- can be overloaded by the user @@ -45,25 +102,164 @@ async def create(self, **extra_kwargs) -> tuple[Ident, Res]: """ @abstractmethod - async def destroy(self, identifier: Ident, **extra_kwargs) -> None: - """Used to destroy an existing resource + async def get(self, identifier: Ident, **extra_kwargs) -> Res | None: + """If exists, returns the resource. Arguments: identifier -- user chosen identifier for the resource **extra_kwargs -- can be overloaded by the user + + Returns: + None if the resource does not exit """ - async def remove( - self, identifier: Ident, *, reraise: bool = False, **extra_kwargs + @abstractmethod + async def _destroy( + self, identifier: Ident, cleanup_context: CleanupContext ) -> None: + """Used to destroy an existing resource + # NOTE: should not be used directly, use the public + version ``remove`` instead. + + Arguments: + identifier -- user chosen identifier for the resource + cleanup_context -- user defined CleanupContext object + """ + + # PUBLIC + + async def create( + self, *, cleanup_context: CleanupContext, **extra_kwargs + ) -> tuple[Ident, Res]: + """Used for creating the resources + + Arguments: + cleanup_context -- user defined CleanupContext object + **extra_kwargs -- can be overloaded by the user + + Returns: + tuple[identifier for the resource, resource object] + """ + identifier, result = await self._create(**extra_kwargs) + await self.redis_client_sdk.redis.set( + self._to_redis_key(identifier), + self._serialize_cleanup_context(cleanup_context), + ) + return identifier, result + + async def remove(self, identifier: Ident, *, reraise: bool = False) -> None: """Attempts to remove the resource, if an error occurs it is logged. Arguments: identifier -- user chosen identifier for the resource reraise -- when True raises any exception raised by ``destroy`` (default: {False}) - **extra_kwargs -- can be overloaded by the user """ + + cleanup_context = await self._get_identifier_context(identifier) + if cleanup_context is None: + _logger.warning( + "Something went wrong, did not find any context for %s", identifier + ) + return + with log_context( _logger, logging.DEBUG, f"{self.__class__}: removing {identifier}" ), log_catch(_logger, reraise=reraise): - await self.destroy(identifier, **extra_kwargs) + await self._destroy(identifier, cleanup_context) + + async def setup(self) -> None: + self._cleanup_task = start_periodic_task( + self._cleanup_unused_identifiers, + interval=self.cleanup_interval, + task_name="cleanup_unused_identifiers_task", + ) + + async def shutdown(self) -> None: + if self._cleanup_task: + await stop_periodic_task(self._cleanup_task, timeout=5) + + # UTILS + + @classmethod + @property + def class_path(cls) -> str: + return f"{cls.__module__}.{cls.__name__}" + + @classmethod + @property + def _redis_key_prefix(cls) -> str: + return f"{cls.class_path}:" + + @classmethod + def _to_redis_key(cls, identifier: Ident) -> str: + return f"{cls._redis_key_prefix}{cls._serialize_identifier(identifier)}" + + @classmethod + def _from_redis_key(cls, redis_key: str) -> Ident: + return cls._deserialize_identifier( + redis_key.removeprefix(cls._redis_key_prefix) + ) + + # CORE + + def __init__( + self, + redis_client_sdk: RedisClientSDK, + *, + cleanup_interval: timedelta = _DEFAULT_CLEANUP_INTERVAL, + ) -> None: + # TODO: add docstring + + if not redis_client_sdk.redis_dsn.endswith( + f"{RedisDatabase.DISTRIBUTED_IDENTIFIERS}" + ): + msg = ( + f"Redis endpoint {redis_client_sdk.redis_dsn} contains the wrong database." + f"Expected {RedisDatabase.DISTRIBUTED_IDENTIFIERS}" + ) + raise TypeError(msg) + + self.redis_client_sdk = redis_client_sdk + self.cleanup_interval = cleanup_interval + + self._cleanup_task: Task | None = None + + async def _get_identifier_context(self, identifier: Ident) -> CleanupContext | None: + raw: bytes | None = await self.redis_client_sdk.redis.get( + self._to_redis_key(identifier) + ) + return self._deserialize_cleanup_context(raw) if raw else None + + async def _get_tracked(self) -> dict[Ident, CleanupContext]: + identifiers: list[Ident] = [ + self._from_redis_key(redis_key) + for redis_key in await self.redis_client_sdk.redis.keys( + f"{self._redis_key_prefix}*" + ) + ] + + cleanup_contexts: list[CleanupContext | None] = await logged_gather( + *(self._get_identifier_context(identifier) for identifier in identifiers), + max_concurrency=_REDIS_MAX_CONCURRENCY, + ) + + return { + identifier: cleanup_context + for identifier, cleanup_context in zip( + identifiers, cleanup_contexts, strict=True + ) + # NOTE: cleanup_context will be None if the key was removed before + # recovering all the cleanup_contexts + if cleanup_context is not None + } + + async def _cleanup_unused_identifiers(self) -> None: + # removes no longer used identifiers + tracked_data: dict[Ident, CleanupContext] = await self._get_tracked() + _logger.info("Will remove unused %s", list(tracked_data.keys())) + + for identifier, cleanup_context in tracked_data.items(): + if await self.is_used(identifier, cleanup_context): + continue + + await self.remove(identifier) diff --git a/services/director-v2/tests/unit/test_utils_distributed_identifier.py b/services/director-v2/tests/unit/test_utils_distributed_identifier.py index 9d645e7b1da..85c2078a54b 100644 --- a/services/director-v2/tests/unit/test_utils_distributed_identifier.py +++ b/services/director-v2/tests/unit/test_utils_distributed_identifier.py @@ -1,105 +1,220 @@ +# pylint:disable=protected-access # pylint:disable=redefined-outer-name -import random +import asyncio import string -from typing import Any +from collections.abc import AsyncIterable, AsyncIterator +from dataclasses import dataclass +from secrets import choice +from typing import Final from uuid import UUID, uuid4 import pytest +from pydantic import BaseModel, NonNegativeInt, StrBytes from pytest_mock import MockerFixture +from servicelib.redis import RedisClientSDK +from servicelib.utils import logged_gather +from settings_library.redis import RedisDatabase, RedisSettings from simcore_service_director_v2.utils.distributed_identifer import ( BaseDistributedIdentifierManager, ) +pytest_simcore_core_services_selection = [ + "redis", +] + +pytest_simcore_ops_services_selection = [ + # "redis-commander", +] + +# if this goes too high, max open file limit is reached +_MAX_REDIS_CONCURRENCY: Final[NonNegativeInt] = 1000 + -# define a custom type of ID for the API class UserDefinedID: + # define a custom type of ID for the API + # by choice it is hard to serialize/deserialize + def __init__(self, uuid: UUID | None = None) -> None: self._id = uuid if uuid else uuid4() def __eq__(self, other: "UserDefinedID") -> bool: return self._id == other._id + # only necessary for nice looking IDs in the logs + def __repr__(self) -> str: + return f"" + + # only necessary for RandomTextAPI def __hash__(self): return hash(str(self._id)) +class RandomTextEntry(BaseModel): + text: str + + @classmethod + def create(cls, length: int) -> "RandomTextEntry": + letters_and_digits = string.ascii_letters + string.digits + text = "".join(choice(letters_and_digits) for _ in range(length)) + return cls(text=text) + + # mocked api interface class RandomTextAPI: def __init__(self) -> None: - self._created: dict[UserDefinedID, Any] = {} - - @staticmethod - def _random_string(length: int) -> str: - letters_and_digits = string.ascii_letters + string.digits - return "".join( - random.choice(letters_and_digits) for _ in range(length) # noqa: S311 - ) + self._created: dict[UserDefinedID, RandomTextEntry] = {} - def create(self, length: int) -> tuple[UserDefinedID, Any]: + def create(self, length: int) -> tuple[UserDefinedID, RandomTextEntry]: identifier = UserDefinedID(uuid4()) - self._created[identifier] = self._random_string(length) + self._created[identifier] = RandomTextEntry.create(length) return identifier, self._created[identifier] def delete(self, identifier: UserDefinedID) -> None: del self._created[identifier] - def get(self, identifier: UserDefinedID) -> Any | None: + def get(self, identifier: UserDefinedID) -> RandomTextEntry | None: return self._created.get(identifier, None) +@dataclass +class ComponentUsingRandomText: + # this one is tracking if the above resources are bring used + + _in_use: bool = True + + def is_used(self, an_id: UserDefinedID) -> bool: + _ = an_id + return self._in_use + + def toggle_is_present(self, in_use: bool) -> None: + self._in_use = in_use + + +class RandomTextCleanupContext(BaseModel): + # when cleaning up, extra parameters might be required + # in this case there aren't any since they are not required. + ... + + # define a custom manager using the custom user defined identifiers -# NOTE: note that the generic uses `[UserDefinedID, Any]` +# NOTE: note that the generic uses `[UserDefinedID, RandomTextEntry, RandomTextCleanupContext]` # which enforces typing constraints on the overloaded abstract methods -class RandomTextResourcesManager(BaseDistributedIdentifierManager[UserDefinedID, Any]): - # pylint:disable=arguments-differ - - def __init__(self) -> None: +class RandomTextResourcesManager( + BaseDistributedIdentifierManager[ + UserDefinedID, RandomTextEntry, RandomTextCleanupContext + ] +): + def __init__( + self, + redis_client_sdk: RedisClientSDK, + component_using_random_text: ComponentUsingRandomText, + ) -> None: + # THESE two systems would normally come stored in the `app` context self.api = RandomTextAPI() + self.component_using_random_text = component_using_random_text - async def get(self, identifier: UserDefinedID, **_) -> Any | None: - return self.api.get(identifier) + super().__init__(redis_client_sdk) + + @classmethod + def _deserialize_identifier(cls, raw: str) -> UserDefinedID: + return UserDefinedID(UUID(raw)) + + @classmethod + def _serialize_identifier(cls, identifier: UserDefinedID) -> str: + return f"{identifier._id}" # noqa: SLF001 - async def create(self, length: int) -> tuple[UserDefinedID, Any]: + @classmethod + def _deserialize_cleanup_context(cls, raw: StrBytes) -> RandomTextCleanupContext: + return RandomTextCleanupContext.parse_raw(raw) + + @classmethod + def _serialize_cleanup_context( + cls, cleanup_context: RandomTextCleanupContext + ) -> str: + return cleanup_context.json() + + async def is_used( + self, identifier: UserDefinedID, cleanup_context: RandomTextCleanupContext + ) -> bool: + _ = cleanup_context + return self.component_using_random_text.is_used(identifier) + + async def _create( # pylint:disable=arguments-differ # type:ignore [override] + self, length: int + ) -> tuple[UserDefinedID, RandomTextEntry]: return self.api.create(length) - async def destroy(self, identifier: UserDefinedID) -> None: + async def get(self, identifier: UserDefinedID, **_) -> RandomTextEntry | None: + return self.api.get(identifier) + + async def _destroy( + self, identifier: UserDefinedID, _: RandomTextCleanupContext + ) -> None: self.api.delete(identifier) @pytest.fixture -def manager() -> RandomTextResourcesManager: - return RandomTextResourcesManager() +async def redis_client_sdk( + redis_service: RedisSettings, +) -> AsyncIterator[RedisClientSDK]: + redis_resources_dns = redis_service.build_redis_dsn( + RedisDatabase.DISTRIBUTED_IDENTIFIERS + ) + + client = RedisClientSDK(redis_resources_dns) + assert client + assert client.redis_dsn == redis_resources_dns + await client.setup() + # cleanup, previous run's leftovers + await client.redis.flushall() + + yield client + # cleanup, properly close the clients + await client.redis.flushall() + await client.shutdown() -async def test_resource_is_missing(manager: RandomTextResourcesManager): - missing_identifier = UserDefinedID() - assert await manager.get(missing_identifier) is None +@pytest.fixture +def component_using_random_text() -> ComponentUsingRandomText: + return ComponentUsingRandomText() -async def test_manual_workflow(manager: RandomTextResourcesManager): - # creation - identifier, _ = await manager.create(length=1) - assert await manager.get(identifier) is not None +@pytest.fixture +async def manager_with_no_cleanup_task( + redis_client_sdk: RedisClientSDK, + component_using_random_text: ComponentUsingRandomText, +) -> RandomTextResourcesManager: + return RandomTextResourcesManager(redis_client_sdk, component_using_random_text) - # removal - await manager.destroy(identifier) - # resource no longer exists - assert await manager.get(identifier) is None +@pytest.fixture +async def manager( + manager_with_no_cleanup_task: RandomTextResourcesManager, +) -> AsyncIterable[RandomTextResourcesManager]: + await manager_with_no_cleanup_task.setup() + yield manager_with_no_cleanup_task + await manager_with_no_cleanup_task.shutdown() + + +async def test_resource_is_missing(manager: RandomTextResourcesManager): + missing_identifier = UserDefinedID() + assert await manager.get(missing_identifier) is None @pytest.mark.parametrize("delete_before_removal", [True, False]) -async def test_automatic_cleanup_workflow( +async def test_full_workflow( manager: RandomTextResourcesManager, delete_before_removal: bool ): # creation - identifier, _ = await manager.create(length=1) + identifier, _ = await manager.create( + cleanup_context=RandomTextCleanupContext(), length=1 + ) assert await manager.get(identifier) is not None # optional removal if delete_before_removal: - await manager.destroy(identifier) + await manager.remove(identifier) is_still_present = not delete_before_removal assert (await manager.get(identifier) is not None) is is_still_present @@ -121,10 +236,12 @@ async def test_remove_raises_error( caplog.clear() error_message = "mock error during resource destroy" - mocker.patch.object(manager, "destroy", side_effect=RuntimeError(error_message)) + mocker.patch.object(manager, "_destroy", side_effect=RuntimeError(error_message)) # after creation object is present - identifier, _ = await manager.create(length=1) + identifier, _ = await manager.create( + cleanup_context=RandomTextCleanupContext(), length=1 + ) assert await manager.get(identifier) is not None if reraise: @@ -135,3 +252,98 @@ async def test_remove_raises_error( # check logs in case of error assert "Unhandled exception:" in caplog.text assert error_message in caplog.text + + +async def _create_resources( + manager: RandomTextResourcesManager, count: int +) -> list[UserDefinedID]: + creation_results: list[tuple[UserDefinedID, RandomTextEntry]] = await logged_gather( + *[ + manager.create(cleanup_context=RandomTextCleanupContext(), length=1) + for _ in range(count) + ], + max_concurrency=_MAX_REDIS_CONCURRENCY, + ) + return [x[0] for x in creation_results] + + +async def _assert_all_resources( + manager: RandomTextResourcesManager, + identifiers: list[UserDefinedID], + *, + exist: bool, +) -> None: + get_results: list[RandomTextEntry | None] = await logged_gather( + *[manager.get(identifier) for identifier in identifiers], + max_concurrency=_MAX_REDIS_CONCURRENCY, + ) + if exist: + assert all(x is not None for x in get_results) + else: + assert all(x is None for x in get_results) + + +@pytest.mark.parametrize("count", [1000]) +async def test_parallel_create_remove(manager: RandomTextResourcesManager, count: int): + # create resources + identifiers: list[UserDefinedID] = await _create_resources(manager, count) + await _assert_all_resources(manager, identifiers, exist=True) + + # safe remove the resources, they do not exist any longer + await asyncio.gather(*[manager.remove(identifier) for identifier in identifiers]) + await _assert_all_resources(manager, identifiers, exist=False) + + +async def test_background_removal_of_unused_resources( + manager_with_no_cleanup_task: RandomTextResourcesManager, + component_using_random_text: ComponentUsingRandomText, +): + # create resources + identifiers: list[UserDefinedID] = await _create_resources( + manager_with_no_cleanup_task, 10_000 + ) + await _assert_all_resources(manager_with_no_cleanup_task, identifiers, exist=True) + + # call cleanup, all resources still exist + await manager_with_no_cleanup_task._cleanup_unused_identifiers() # noqa: SLF001 + await _assert_all_resources(manager_with_no_cleanup_task, identifiers, exist=True) + + # make resources unused in external system + component_using_random_text.toggle_is_present(in_use=False) + await manager_with_no_cleanup_task._cleanup_unused_identifiers() # noqa: SLF001 + await _assert_all_resources(manager_with_no_cleanup_task, identifiers, exist=False) + + +async def test_no_redis_key_overlap_when_inheriting( + redis_client_sdk: RedisClientSDK, + component_using_random_text: ComponentUsingRandomText, +): + class ChildRandomTextResourcesManager(RandomTextResourcesManager): + ... + + parent_manager = RandomTextResourcesManager( + redis_client_sdk, component_using_random_text + ) + child_manager = ChildRandomTextResourcesManager( + redis_client_sdk, component_using_random_text + ) + + # create an entry in the child and one in the parent + + parent_identifier, _ = await parent_manager.create( + cleanup_context=RandomTextCleanupContext(), length=1 + ) + child_identifier, _ = await child_manager.create( + cleanup_context=RandomTextCleanupContext(), length=1 + ) + assert parent_identifier != child_identifier + + keys = await redis_client_sdk.redis.keys("*") + assert len(keys) == 2 + + # check keys contain the correct prefixes + key_prefixes: set[str] = {k.split(":")[0] for k in keys} + assert key_prefixes == { + RandomTextResourcesManager.class_path, + ChildRandomTextResourcesManager.class_path, + } From 5f882b729f75c21d235092620dc94b1161c1d837 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Fri, 17 Nov 2023 11:43:06 +0100 Subject: [PATCH 03/20] renamed --- .../{distributed_identifer.py => distributed_identifier.py} | 0 .../director-v2/tests/unit/test_utils_distributed_identifier.py | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename services/director-v2/src/simcore_service_director_v2/utils/{distributed_identifer.py => distributed_identifier.py} (100%) diff --git a/services/director-v2/src/simcore_service_director_v2/utils/distributed_identifer.py b/services/director-v2/src/simcore_service_director_v2/utils/distributed_identifier.py similarity index 100% rename from services/director-v2/src/simcore_service_director_v2/utils/distributed_identifer.py rename to services/director-v2/src/simcore_service_director_v2/utils/distributed_identifier.py diff --git a/services/director-v2/tests/unit/test_utils_distributed_identifier.py b/services/director-v2/tests/unit/test_utils_distributed_identifier.py index 85c2078a54b..b86d1d1c5aa 100644 --- a/services/director-v2/tests/unit/test_utils_distributed_identifier.py +++ b/services/director-v2/tests/unit/test_utils_distributed_identifier.py @@ -15,7 +15,7 @@ from servicelib.redis import RedisClientSDK from servicelib.utils import logged_gather from settings_library.redis import RedisDatabase, RedisSettings -from simcore_service_director_v2.utils.distributed_identifer import ( +from simcore_service_director_v2.utils.distributed_identifier import ( BaseDistributedIdentifierManager, ) From 11bb5909d87a6bd72c6454c63bd3c2315f7a4e31 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Fri, 17 Nov 2023 11:44:54 +0100 Subject: [PATCH 04/20] api_key name also depends on run_id --- .../dynamic_sidecar/docker_compose_specs.py | 4 +++- .../scheduler/_core/_events_user_services.py | 1 + .../modules/osparc_variables_substitutions.py | 19 +++++++++++++++---- 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_compose_specs.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_compose_specs.py index e67c3c45f64..b8215e54ec1 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_compose_specs.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_compose_specs.py @@ -12,7 +12,7 @@ PathMappingsLabel, SimcoreServiceLabels, ) -from models_library.services import ServiceKey, ServiceVersion +from models_library.services import RunID, ServiceKey, ServiceVersion from models_library.services_resources import ( DEFAULT_SINGLE_SERVICE_NAME, ResourcesDict, @@ -275,6 +275,7 @@ async def assemble_spec( # pylint: disable=too-many-arguments # noqa: PLR0913 node_id: NodeID, simcore_user_agent: str, swarm_stack_name: str, + run_id: RunID, ) -> str: """ returns a docker-compose spec used by @@ -391,6 +392,7 @@ async def assemble_spec( # pylint: disable=too-many-arguments # noqa: PLR0913 product_name=product_name, user_id=user_id, node_id=node_id, + run_id=run_id, ) stringified_service_spec: str = replace_env_vars_in_compose_spec( diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_user_services.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_user_services.py index e63bffe2828..bfcec1548e3 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_user_services.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_user_services.py @@ -98,6 +98,7 @@ async def create_user_services(app: FastAPI, scheduler_data: SchedulerData): node_id=scheduler_data.node_uuid, simcore_user_agent=scheduler_data.request_simcore_user_agent, swarm_stack_name=dynamic_services_scheduler_settings.SWARM_STACK_NAME, + run_id=scheduler_data.run_id, ) _logger.debug( diff --git a/services/director-v2/src/simcore_service_director_v2/modules/osparc_variables_substitutions.py b/services/director-v2/src/simcore_service_director_v2/modules/osparc_variables_substitutions.py index 078c6f9eae2..e1825d8eea3 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/osparc_variables_substitutions.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/osparc_variables_substitutions.py @@ -14,7 +14,7 @@ from models_library.products import ProductName from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID -from models_library.services import ServiceKey, ServiceVersion +from models_library.services import RunID, ServiceKey, ServiceVersion from models_library.users import UserID from models_library.utils.specs_substitution import SpecsSubstitutionsResolver from pydantic import BaseModel, EmailStr @@ -187,6 +187,7 @@ async def resolve_and_substitute_service_lifetime_variables_in_specs( product_name: ProductName, user_id: UserID, node_id: NodeID, + run_id: RunID, safe: bool = True, ) -> dict[str, Any]: registry: OsparcVariablesTable = app.state.lifespan_osparc_variables_table @@ -204,6 +205,7 @@ async def resolve_and_substitute_service_lifetime_variables_in_specs( product_name=product_name, user_id=user_id, node_id=node_id, + run_id=run_id, ), # NOTE: the api key and secret cannot be resolved in parallel # due to race conditions @@ -218,25 +220,35 @@ async def resolve_and_substitute_service_lifetime_variables_in_specs( async def _get_or_create_api_key( - app: FastAPI, product_name: ProductName, user_id: UserID, node_id: NodeID + app: FastAPI, + product_name: ProductName, + user_id: UserID, + node_id: NodeID, + run_id: RunID, ) -> str: key_data = await get_or_create_api_key( app, product_name=product_name, user_id=user_id, node_id=node_id, + run_id=run_id, ) return key_data.api_key # type:ignore [no-any-return] async def _get_or_create_api_secret( - app: FastAPI, product_name: ProductName, user_id: UserID, node_id: NodeID + app: FastAPI, + product_name: ProductName, + user_id: UserID, + node_id: NodeID, + run_id: RunID, ) -> str: key_data = await get_or_create_api_key( app, product_name=product_name, user_id=user_id, node_id=node_id, + run_id=run_id, ) return key_data.api_secret # type:ignore [no-any-return] @@ -271,7 +283,6 @@ def _setup_session_osparc_variables(app: FastAPI): ("OSPARC_VARIABLE_PRODUCT_NAME", "product_name"), ("OSPARC_VARIABLE_STUDY_UUID", "project_id"), ("OSPARC_VARIABLE_NODE_ID", "node_id"), - # ANE -> PC: why not register the user_id as well at this point? ]: table.register_from_context(name, context_name) From cbfeffc216c2d2c800e133fdc6e52c3b7d372464 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Fri, 17 Nov 2023 14:16:08 +0100 Subject: [PATCH 05/20] refactor --- .../modules/api_keys_manager.py | 124 +++++++++++++----- .../scheduler/_core/_events_utils.py | 5 +- .../utils/distributed_identifier.py | 9 +- 3 files changed, 98 insertions(+), 40 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/api_keys_manager.py b/services/director-v2/src/simcore_service_director_v2/modules/api_keys_manager.py index 0fa6f457fe8..066cd1a0e65 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/api_keys_manager.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/api_keys_manager.py @@ -6,92 +6,133 @@ from models_library.products import ProductName from models_library.projects_nodes_io import NodeID from models_library.rabbitmq_basic_types import RPCMethodName +from models_library.services import RunID from models_library.users import UserID -from pydantic import parse_obj_as +from pydantic import BaseModel, StrBytes, parse_obj_as from servicelib.rabbitmq import RabbitMQRPCClient +from servicelib.redis import RedisClientSDK +from settings_library.redis import RedisDatabase -from ..utils.distributed_identifer import BaseDistributedIdentifierManager +from ..core.settings import AppSettings +from ..utils.distributed_identifier import BaseDistributedIdentifierManager from .rabbitmq import get_rabbitmq_rpc_client -class APIKeysManager(BaseDistributedIdentifierManager[str, ApiKeyGet]): - def __init__(self, app: FastAPI) -> None: - self.GET_OR_CREATE_INJECTS_IDENTIFIER = True +class CleanupContext(BaseModel): + # used for checking if used + node_id: NodeID + + # used for removing + product_name: ProductName + user_id: UserID + + +class APIKeysManager(BaseDistributedIdentifierManager[str, ApiKeyGet, CleanupContext]): + def __init__(self, app: FastAPI, redis_client_sdk: RedisClientSDK) -> None: + super().__init__(redis_client_sdk) self.app = app @property def rpc_client(self) -> RabbitMQRPCClient: return get_rabbitmq_rpc_client(self.app) - # pylint:disable=arguments-differ + @classmethod + def _deserialize_identifier(cls, raw: str) -> str: + return raw + + @classmethod + def _serialize_identifier(cls, identifier: str) -> str: + return identifier + + @classmethod + def _deserialize_cleanup_context(cls, raw: StrBytes) -> CleanupContext: + return CleanupContext.parse_raw(raw) - async def get( # type:ignore [override] + @classmethod + def _serialize_cleanup_context(cls, cleanup_context: CleanupContext) -> str: + return cleanup_context.json() + + async def is_used(self, identifier: str, cleanup_context: CleanupContext) -> bool: + _ = identifier + scheduler: "DynamicSidecarsScheduler" = ( + self.app.state.dynamic_sidecar_scheduler + ) # noqa: F821 + return scheduler.is_service_tracked(cleanup_context.node_id) + + async def _create( # pylint:disable=arguments-differ # type:ignore [override] self, identifier: str, product_name: ProductName, user_id: UserID - ) -> ApiKeyGet | None: + ) -> tuple[str, ApiKeyGet]: result = await self.rpc_client.request( WEBSERVER_RPC_NAMESPACE, - parse_obj_as(RPCMethodName, "api_key_get"), + parse_obj_as(RPCMethodName, "create_api_keys"), product_name=product_name, user_id=user_id, - name=identifier, + new=ApiKeyCreate(display_name=identifier, expiration=None), ) - return parse_obj_as(ApiKeyGet | None, result) + return identifier, ApiKeyGet.parse_obj(result) - async def create( # type:ignore [override] + async def get( # pylint:disable=arguments-differ # type:ignore [override] self, identifier: str, product_name: ProductName, user_id: UserID - ) -> tuple[str, ApiKeyGet]: + ) -> ApiKeyGet | None: result = await self.rpc_client.request( WEBSERVER_RPC_NAMESPACE, - parse_obj_as(RPCMethodName, "create_api_keys"), + parse_obj_as(RPCMethodName, "api_key_get"), product_name=product_name, user_id=user_id, - new=ApiKeyCreate(display_name=identifier, expiration=None), + name=identifier, ) - return identifier, ApiKeyGet.parse_obj(result) + return parse_obj_as(ApiKeyGet | None, result) - async def destroy( # type:ignore [override] - self, identifier: str, product_name: ProductName, user_id: UserID - ) -> None: + async def _destroy(self, identifier: str, cleanup_context: CleanupContext) -> None: await self.rpc_client.request( WEBSERVER_RPC_NAMESPACE, parse_obj_as(RPCMethodName, "delete_api_keys"), - product_name=product_name, - user_id=user_id, + product_name=cleanup_context.product_name, + user_id=cleanup_context.user_id, name=identifier, ) async def get_or_create_api_key( - app: FastAPI, *, product_name: ProductName, user_id: UserID, node_id: NodeID + app: FastAPI, + *, + product_name: ProductName, + user_id: UserID, + node_id: NodeID, + run_id: RunID, ) -> ApiKeyGet: api_keys_manager = _get_api_keys_manager(app) - display_name = _get_api_key_name(node_id) + display_name = _get_api_key_name(node_id, run_id) key_data: ApiKeyGet | None = await api_keys_manager.get( identifier=display_name, product_name=product_name, user_id=user_id ) if key_data is None: _, key_data = await api_keys_manager.create( - identifier=display_name, product_name=product_name, user_id=user_id + cleanup_context=CleanupContext( + node_id=node_id, product_name=product_name, user_id=user_id + ), + identifier=display_name, + product_name=product_name, + user_id=user_id, ) return key_data -async def safe_remove( - app: FastAPI, *, node_id: NodeID, product_name: ProductName, user_id: UserID -) -> None: +async def safe_remove(app: FastAPI, *, node_id: NodeID, run_id: RunID) -> None: api_keys_manager = _get_api_keys_manager(app) - display_name = _get_api_key_name(node_id) + display_name = _get_api_key_name(node_id, run_id) - await api_keys_manager.remove( - identifier=display_name, product_name=product_name, user_id=user_id - ) + await api_keys_manager.remove(identifier=display_name) -def _get_api_key_name(node_id: NodeID) -> str: - obfuscated_node_id = uuid5(node_id, f"{node_id}") - return f"_auto_{obfuscated_node_id}" +def _get_api_key_name(node_id: NodeID, run_id: RunID) -> str: + # Generates a new unique key name for each service run + # This avoids race conditions if the service is starting and + # an the cleanup job is removing the key from an old run which + # was wrongly shut down + return f"_auto_{uuid5(node_id, run_id)}" def _get_api_keys_manager(app: FastAPI) -> APIKeysManager: @@ -101,6 +142,19 @@ def _get_api_keys_manager(app: FastAPI) -> APIKeysManager: def setup(app: FastAPI) -> None: async def on_startup() -> None: - app.state.api_keys_manager = APIKeysManager(app) + settings: AppSettings = app.state.settings + + redis_dsn = settings.REDIS.build_redis_dsn( + RedisDatabase.DISTRIBUTED_IDENTIFIERS + ) + redis_client_sdk = RedisClientSDK(redis_dsn) + app.state.api_keys_manager = manager = APIKeysManager(app, redis_client_sdk) + + await manager.setup() + + async def on_shutdown() -> None: + manager: APIKeysManager = app.state.api_keys_manager + await manager.shutdown() app.add_event_handler("startup", on_startup) + app.add_event_handler("shutdown", on_shutdown) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py index 0722c8a6faf..99d13983d70 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py @@ -310,10 +310,7 @@ async def attempt_pod_removal_and_data_saving( ) await safe_remove( - app, - node_id=scheduler_data.node_uuid, - product_name=scheduler_data.product_name, - user_id=scheduler_data.user_id, + app, node_id=scheduler_data.node_uuid, run_id=scheduler_data.run_id ) # remove sidecar's api client diff --git a/services/director-v2/src/simcore_service_director_v2/utils/distributed_identifier.py b/services/director-v2/src/simcore_service_director_v2/utils/distributed_identifier.py index 2659d1d56f9..f6dc3d2aa8d 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/distributed_identifier.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/distributed_identifier.py @@ -208,7 +208,14 @@ def __init__( *, cleanup_interval: timedelta = _DEFAULT_CLEANUP_INTERVAL, ) -> None: - # TODO: add docstring + """ + Arguments: + redis_client_sdk -- client connecting to Redis + + Keyword Arguments: + cleanup_interval -- interval at which cleanup for unused + resources runs (default: {_DEFAULT_CLEANUP_INTERVAL}) + """ if not redis_client_sdk.redis_dsn.endswith( f"{RedisDatabase.DISTRIBUTED_IDENTIFIERS}" From 29970e7f73af2d2a684b2927cc8ca31b7053325d Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Fri, 17 Nov 2023 14:16:40 +0100 Subject: [PATCH 06/20] warnings --- .../simcore_service_director_v2/modules/api_keys_manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/api_keys_manager.py b/services/director-v2/src/simcore_service_director_v2/modules/api_keys_manager.py index 066cd1a0e65..ef283800c49 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/api_keys_manager.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/api_keys_manager.py @@ -54,9 +54,9 @@ def _serialize_cleanup_context(cls, cleanup_context: CleanupContext) -> str: async def is_used(self, identifier: str, cleanup_context: CleanupContext) -> bool: _ = identifier - scheduler: "DynamicSidecarsScheduler" = ( + scheduler: "DynamicSidecarsScheduler" = ( # noqa: F821 self.app.state.dynamic_sidecar_scheduler - ) # noqa: F821 + ) return scheduler.is_service_tracked(cleanup_context.node_id) async def _create( # pylint:disable=arguments-differ # type:ignore [override] From 6a24fe66f7e8659e31e20e047dde2e2da1aed0ba Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Fri, 17 Nov 2023 14:50:09 +0100 Subject: [PATCH 07/20] fixed and extended tests --- .../unit/test_modules_api_keys_manager.py | 106 ++++++++++++++---- 1 file changed, 87 insertions(+), 19 deletions(-) diff --git a/services/director-v2/tests/unit/test_modules_api_keys_manager.py b/services/director-v2/tests/unit/test_modules_api_keys_manager.py index 3c56a05936c..479c46f6410 100644 --- a/services/director-v2/tests/unit/test_modules_api_keys_manager.py +++ b/services/director-v2/tests/unit/test_modules_api_keys_manager.py @@ -1,7 +1,9 @@ +# pylint:disable=protected-access # pylint:disable=redefined-outer-name # pylint:disable=unused-argument from collections.abc import Awaitable, Callable +from unittest.mock import AsyncMock import pytest from faker import Faker @@ -10,16 +12,22 @@ from models_library.api_schemas_webserver.auth import ApiKeyCreate, ApiKeyGet from models_library.products import ProductName from models_library.projects_nodes_io import NodeID +from models_library.services import RunID from models_library.users import UserID from pytest_mock import MockerFixture from servicelib.rabbitmq import RabbitMQRPCClient, RPCRouter +from servicelib.redis import RedisClientSDK +from settings_library.redis import RedisDatabase, RedisSettings from simcore_service_director_v2.modules.api_keys_manager import ( APIKeysManager, _get_api_key_name, + get_or_create_api_key, + safe_remove, ) pytest_simcore_core_services_selection = [ "rabbit", + "redis", ] @@ -28,11 +36,33 @@ def node_id(faker: Faker) -> NodeID: return faker.uuid4(cast_to=None) -def test_get_api_key_name_is_not_randomly_generated(node_id: NodeID): - api_key_names = {_get_api_key_name(node_id) for x in range(1000)} +@pytest.fixture +def run_id(faker: Faker) -> RunID: + return RunID(faker.pystr()) + + +@pytest.fixture +def product_name(faker: Faker) -> ProductName: + return faker.pystr() + + +@pytest.fixture +def user_id(faker: Faker) -> UserID: + return faker.pyint() + + +def test_get_api_key_name_is_not_randomly_generated(node_id: NodeID, run_id: RunID): + api_key_names = {_get_api_key_name(node_id, run_id) for _ in range(1000)} assert len(api_key_names) == 1 +@pytest.fixture +def redis_client_sdk(redis_service: RedisSettings) -> RedisClientSDK: + return RedisClientSDK( + redis_service.build_redis_dsn(RedisDatabase.DISTRIBUTED_IDENTIFIERS) + ) + + @pytest.fixture async def mock_rpc_server( rabbitmq_rpc_client: Callable[[str], Awaitable[RabbitMQRPCClient]], @@ -74,27 +104,65 @@ async def delete_api_keys( return rpc_client -async def test_rpc_endpoints( - mock_rpc_server: RabbitMQRPCClient, - faker: Faker, -): - manager = APIKeysManager(FastAPI()) +@pytest.fixture +def app() -> FastAPI: + return FastAPI() + + +@pytest.fixture +def mock_dynamic_sidecars_scheduler(app: FastAPI, is_used: bool) -> None: + scheduler_mock = AsyncMock() + scheduler_mock.is_service_tracked = lambda: is_used + app.state.dynamic_sidecar_scheduler = scheduler_mock - identifier = faker.pystr() - product_name = faker.pystr() - user_id = faker.pyint() - api_key = await manager.get( - identifier=identifier, product_name=product_name, user_id=user_id +@pytest.fixture +def api_keys_manager( + mock_rpc_server: RabbitMQRPCClient, + mock_dynamic_sidecars_scheduler: None, + redis_client_sdk: RedisClientSDK, + app: FastAPI, +) -> APIKeysManager: + app.state.api_keys_manager = manager = APIKeysManager(app, redis_client_sdk) + return manager + + +@pytest.mark.parametrize("is_used", [True]) +async def test_api_keys_workflow( + api_keys_manager: APIKeysManager, + app: FastAPI, + node_id: NodeID, + run_id: RunID, + product_name: ProductName, + user_id: UserID, +): + api_key = await get_or_create_api_key( + app, product_name=product_name, user_id=user_id, node_id=node_id, run_id=run_id ) assert isinstance(api_key, ApiKeyGet) - - identifier, api_key = await manager.create( - identifier=identifier, product_name=product_name, user_id=user_id + assert len(await api_keys_manager._get_tracked()) == 1 # noqa: SLF001 + + await safe_remove(app, node_id=node_id, run_id=run_id) + assert len(await api_keys_manager._get_tracked()) == 0 # noqa: SLF001 + + +@pytest.mark.parametrize("is_used", [False, True]) +async def test_background_cleanup( + api_keys_manager: APIKeysManager, + app: FastAPI, + node_id: NodeID, + run_id: RunID, + product_name: ProductName, + user_id: UserID, + is_used: bool, +) -> None: + api_key = await get_or_create_api_key( + app, product_name=product_name, user_id=user_id, node_id=node_id, run_id=run_id ) - assert isinstance(identifier, str) assert isinstance(api_key, ApiKeyGet) + assert len(await api_keys_manager._get_tracked()) == 1 # noqa: SLF001 - await manager.destroy( - identifier=identifier, product_name=product_name, user_id=user_id - ) + await api_keys_manager._cleanup_unused_identifiers() # noqa: SLF001 + assert ( + len(await api_keys_manager._get_tracked()) == 1 if is_used else 0 + ) # noqa: SLF001 From 144b79594c8525483fb44c277d51bee9fbbbd5db Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Fri, 17 Nov 2023 14:54:31 +0100 Subject: [PATCH 08/20] refactor test --- .../tests/unit/test_modules_api_keys_manager.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/services/director-v2/tests/unit/test_modules_api_keys_manager.py b/services/director-v2/tests/unit/test_modules_api_keys_manager.py index 479c46f6410..f6c7fad53d9 100644 --- a/services/director-v2/tests/unit/test_modules_api_keys_manager.py +++ b/services/director-v2/tests/unit/test_modules_api_keys_manager.py @@ -127,6 +127,10 @@ def api_keys_manager( return manager +async def _get_resource_count(api_keys_manager: APIKeysManager) -> int: + return len(await api_keys_manager._get_tracked()) # noqa: SLF001 + + @pytest.mark.parametrize("is_used", [True]) async def test_api_keys_workflow( api_keys_manager: APIKeysManager, @@ -140,10 +144,10 @@ async def test_api_keys_workflow( app, product_name=product_name, user_id=user_id, node_id=node_id, run_id=run_id ) assert isinstance(api_key, ApiKeyGet) - assert len(await api_keys_manager._get_tracked()) == 1 # noqa: SLF001 + assert await _get_resource_count(api_keys_manager) == 1 await safe_remove(app, node_id=node_id, run_id=run_id) - assert len(await api_keys_manager._get_tracked()) == 0 # noqa: SLF001 + assert await _get_resource_count(api_keys_manager) == 0 @pytest.mark.parametrize("is_used", [False, True]) @@ -160,9 +164,7 @@ async def test_background_cleanup( app, product_name=product_name, user_id=user_id, node_id=node_id, run_id=run_id ) assert isinstance(api_key, ApiKeyGet) - assert len(await api_keys_manager._get_tracked()) == 1 # noqa: SLF001 + assert await _get_resource_count(api_keys_manager) == 1 await api_keys_manager._cleanup_unused_identifiers() # noqa: SLF001 - assert ( - len(await api_keys_manager._get_tracked()) == 1 if is_used else 0 - ) # noqa: SLF001 + assert await _get_resource_count(api_keys_manager) == 1 if is_used else 0 From 4727f37f9d98c7e549d0a5e073d85db1549ef52b Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Fri, 17 Nov 2023 14:59:20 +0100 Subject: [PATCH 09/20] refactor cleanup interval --- .../simcore_service_director_v2/modules/api_keys_manager.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/api_keys_manager.py b/services/director-v2/src/simcore_service_director_v2/modules/api_keys_manager.py index ef283800c49..ddc96b4f920 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/api_keys_manager.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/api_keys_manager.py @@ -1,3 +1,4 @@ +from datetime import timedelta from uuid import uuid5 from fastapi import FastAPI @@ -17,6 +18,8 @@ from ..utils.distributed_identifier import BaseDistributedIdentifierManager from .rabbitmq import get_rabbitmq_rpc_client +_CLEANUP_INTERVAL = timedelta(minutes=5) + class CleanupContext(BaseModel): # used for checking if used @@ -29,7 +32,7 @@ class CleanupContext(BaseModel): class APIKeysManager(BaseDistributedIdentifierManager[str, ApiKeyGet, CleanupContext]): def __init__(self, app: FastAPI, redis_client_sdk: RedisClientSDK) -> None: - super().__init__(redis_client_sdk) + super().__init__(redis_client_sdk, cleanup_interval=_CLEANUP_INTERVAL) self.app = app @property From 66ae1703ee5e518d8a498d24913065df79e44fa6 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Fri, 17 Nov 2023 15:09:35 +0100 Subject: [PATCH 10/20] extended docstring --- .../utils/distributed_identifier.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/utils/distributed_identifier.py b/services/director-v2/src/simcore_service_director_v2/utils/distributed_identifier.py index f6dc3d2aa8d..8c50bbd64f1 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/distributed_identifier.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/distributed_identifier.py @@ -19,12 +19,20 @@ Res = TypeVar("Res") # Provided at the moment of creation. -# Can be used inside ``is_present`` and ``remove``. +# Can be used inside ``is_used`` and ``_destroy``. CleanupContext = TypeVar("CleanupContext") class BaseDistributedIdentifierManager(ABC, Generic[Ident, Res, CleanupContext]): - # TODO: add docstring here + """Used to implement managers for resources that require book keeping + in a distributed system. + + Generics: + Ident -- a user defined object: used to uniquely identify the resource + Res -- a user defined object: referring to an existing resource + CleanupContext -- a user defined object: contains all necessary + arguments used for removal and cleanup. + """ @classmethod @abstractmethod From 1b9e09d31b87a7c26ed7b3468bfd68cadb120a34 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Fri, 17 Nov 2023 15:25:06 +0100 Subject: [PATCH 11/20] docstrings --- .../utils/distributed_identifier.py | 52 +++++++++++-------- 1 file changed, 30 insertions(+), 22 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/utils/distributed_identifier.py b/services/director-v2/src/simcore_service_director_v2/utils/distributed_identifier.py index 8c50bbd64f1..45f7f25cfdd 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/distributed_identifier.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/distributed_identifier.py @@ -15,28 +15,30 @@ _REDIS_MAX_CONCURRENCY: Final[NonNegativeInt] = 10 _DEFAULT_CLEANUP_INTERVAL: Final[timedelta] = timedelta(minutes=1) -Ident = TypeVar("Ident") -Res = TypeVar("Res") - -# Provided at the moment of creation. -# Can be used inside ``is_used`` and ``_destroy``. +Identifier = TypeVar("Identifier") +ResourceObject = TypeVar("ResourceObject") CleanupContext = TypeVar("CleanupContext") -class BaseDistributedIdentifierManager(ABC, Generic[Ident, Res, CleanupContext]): +class BaseDistributedIdentifierManager( + ABC, Generic[Identifier, ResourceObject, CleanupContext] +): """Used to implement managers for resources that require book keeping in a distributed system. + NOTE: that ``Identifier`` and ``ResourceObject`` are serialized and deserialized + to and from Redis. + Generics: - Ident -- a user defined object: used to uniquely identify the resource - Res -- a user defined object: referring to an existing resource + Identifier -- a user defined object: used to uniquely identify the resource + ResourceObject -- a user defined object: referring to an existing resource CleanupContext -- a user defined object: contains all necessary arguments used for removal and cleanup. """ @classmethod @abstractmethod - def _deserialize_identifier(cls, raw: str) -> Ident: + def _deserialize_identifier(cls, raw: str) -> Identifier: """User provided deserialization for the identifier Arguments: @@ -48,7 +50,7 @@ def _deserialize_identifier(cls, raw: str) -> Ident: @classmethod @abstractmethod - def _serialize_identifier(cls, identifier: Ident) -> str: + def _serialize_identifier(cls, identifier: Identifier) -> str: """User provided serialization for the identifier Arguments: @@ -83,7 +85,9 @@ def _serialize_cleanup_context(cls, cleanup_context: CleanupContext) -> str: """ @abstractmethod - async def is_used(self, identifier: Ident, cleanup_context: CleanupContext) -> bool: + async def is_used( + self, identifier: Identifier, cleanup_context: CleanupContext + ) -> bool: """Check if the resource associated to the ``identifier`` is still being used. # NOTE: a resource can be created but not in use. @@ -97,7 +101,7 @@ async def is_used(self, identifier: Ident, cleanup_context: CleanupContext) -> b """ @abstractmethod - async def _create(self, **extra_kwargs) -> tuple[Ident, Res]: + async def _create(self, **extra_kwargs) -> tuple[Identifier, ResourceObject]: """Used INTERNALLY for creating the resources. # NOTE: should not be used directly, use the public version ``create`` instead. @@ -110,7 +114,9 @@ async def _create(self, **extra_kwargs) -> tuple[Ident, Res]: """ @abstractmethod - async def get(self, identifier: Ident, **extra_kwargs) -> Res | None: + async def get( + self, identifier: Identifier, **extra_kwargs + ) -> ResourceObject | None: """If exists, returns the resource. Arguments: @@ -123,7 +129,7 @@ async def get(self, identifier: Ident, **extra_kwargs) -> Res | None: @abstractmethod async def _destroy( - self, identifier: Ident, cleanup_context: CleanupContext + self, identifier: Identifier, cleanup_context: CleanupContext ) -> None: """Used to destroy an existing resource # NOTE: should not be used directly, use the public @@ -138,7 +144,7 @@ async def _destroy( async def create( self, *, cleanup_context: CleanupContext, **extra_kwargs - ) -> tuple[Ident, Res]: + ) -> tuple[Identifier, ResourceObject]: """Used for creating the resources Arguments: @@ -155,7 +161,7 @@ async def create( ) return identifier, result - async def remove(self, identifier: Ident, *, reraise: bool = False) -> None: + async def remove(self, identifier: Identifier, *, reraise: bool = False) -> None: """Attempts to remove the resource, if an error occurs it is logged. Arguments: @@ -199,11 +205,11 @@ def _redis_key_prefix(cls) -> str: return f"{cls.class_path}:" @classmethod - def _to_redis_key(cls, identifier: Ident) -> str: + def _to_redis_key(cls, identifier: Identifier) -> str: return f"{cls._redis_key_prefix}{cls._serialize_identifier(identifier)}" @classmethod - def _from_redis_key(cls, redis_key: str) -> Ident: + def _from_redis_key(cls, redis_key: str) -> Identifier: return cls._deserialize_identifier( redis_key.removeprefix(cls._redis_key_prefix) ) @@ -239,14 +245,16 @@ def __init__( self._cleanup_task: Task | None = None - async def _get_identifier_context(self, identifier: Ident) -> CleanupContext | None: + async def _get_identifier_context( + self, identifier: Identifier + ) -> CleanupContext | None: raw: bytes | None = await self.redis_client_sdk.redis.get( self._to_redis_key(identifier) ) return self._deserialize_cleanup_context(raw) if raw else None - async def _get_tracked(self) -> dict[Ident, CleanupContext]: - identifiers: list[Ident] = [ + async def _get_tracked(self) -> dict[Identifier, CleanupContext]: + identifiers: list[Identifier] = [ self._from_redis_key(redis_key) for redis_key in await self.redis_client_sdk.redis.keys( f"{self._redis_key_prefix}*" @@ -270,7 +278,7 @@ async def _get_tracked(self) -> dict[Ident, CleanupContext]: async def _cleanup_unused_identifiers(self) -> None: # removes no longer used identifiers - tracked_data: dict[Ident, CleanupContext] = await self._get_tracked() + tracked_data: dict[Identifier, CleanupContext] = await self._get_tracked() _logger.info("Will remove unused %s", list(tracked_data.keys())) for identifier, cleanup_context in tracked_data.items(): From 09075c22ab2ca4b28607779179dabbde83f718bf Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Fri, 17 Nov 2023 15:40:28 +0100 Subject: [PATCH 12/20] mypy --- .../modules/api_keys_manager.py | 8 +- .../utils/distributed_identifier.py | 285 +++++++++--------- .../unit/test_utils_distributed_identifier.py | 4 +- 3 files changed, 144 insertions(+), 153 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/api_keys_manager.py b/services/director-v2/src/simcore_service_director_v2/modules/api_keys_manager.py index ddc96b4f920..1b75f06ce5d 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/api_keys_manager.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/api_keys_manager.py @@ -57,12 +57,12 @@ def _serialize_cleanup_context(cls, cleanup_context: CleanupContext) -> str: async def is_used(self, identifier: str, cleanup_context: CleanupContext) -> bool: _ = identifier - scheduler: "DynamicSidecarsScheduler" = ( # noqa: F821 + scheduler: "DynamicSidecarsScheduler" = ( # type:ignore [name-defined] # noqa: F821 self.app.state.dynamic_sidecar_scheduler ) - return scheduler.is_service_tracked(cleanup_context.node_id) + return bool(scheduler.is_service_tracked(cleanup_context.node_id)) - async def _create( # pylint:disable=arguments-differ # type:ignore [override] + async def _create( # type:ignore [override] # pylint:disable=arguments-differ self, identifier: str, product_name: ProductName, user_id: UserID ) -> tuple[str, ApiKeyGet]: result = await self.rpc_client.request( @@ -74,7 +74,7 @@ async def _create( # pylint:disable=arguments-differ # type:ignore [override] ) return identifier, ApiKeyGet.parse_obj(result) - async def get( # pylint:disable=arguments-differ # type:ignore [override] + async def get( # type:ignore [override] # pylint:disable=arguments-differ self, identifier: str, product_name: ProductName, user_id: UserID ) -> ApiKeyGet | None: result = await self.rpc_client.request( diff --git a/services/director-v2/src/simcore_service_director_v2/utils/distributed_identifier.py b/services/director-v2/src/simcore_service_director_v2/utils/distributed_identifier.py index 45f7f25cfdd..0b26d53cdfc 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/distributed_identifier.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/distributed_identifier.py @@ -36,6 +36,144 @@ class BaseDistributedIdentifierManager( arguments used for removal and cleanup. """ + def __init__( + self, + redis_client_sdk: RedisClientSDK, + *, + cleanup_interval: timedelta = _DEFAULT_CLEANUP_INTERVAL, + ) -> None: + """ + Arguments: + redis_client_sdk -- client connecting to Redis + + Keyword Arguments: + cleanup_interval -- interval at which cleanup for unused + resources runs (default: {_DEFAULT_CLEANUP_INTERVAL}) + """ + + if not redis_client_sdk.redis_dsn.endswith( + f"{RedisDatabase.DISTRIBUTED_IDENTIFIERS}" + ): + msg = ( + f"Redis endpoint {redis_client_sdk.redis_dsn} contains the wrong database." + f"Expected {RedisDatabase.DISTRIBUTED_IDENTIFIERS}" + ) + raise TypeError(msg) + + self.redis_client_sdk = redis_client_sdk + self.cleanup_interval = cleanup_interval + + self._cleanup_task: Task | None = None + + async def setup(self) -> None: + self._cleanup_task = start_periodic_task( + self._cleanup_unused_identifiers, + interval=self.cleanup_interval, + task_name="cleanup_unused_identifiers_task", + ) + + async def shutdown(self) -> None: + if self._cleanup_task: + await stop_periodic_task(self._cleanup_task, timeout=5) + + @classmethod + def class_path(cls) -> str: + return f"{cls.__module__}.{cls.__name__}" + + @classmethod + def _redis_key_prefix(cls) -> str: + return f"{cls.class_path()}:" + + @classmethod + def _to_redis_key(cls, identifier: Identifier) -> str: + return f"{cls._redis_key_prefix()}{cls._serialize_identifier(identifier)}" + + @classmethod + def _from_redis_key(cls, redis_key: str) -> Identifier: + sad = redis_key.removeprefix(cls._redis_key_prefix()) + return cls._deserialize_identifier(sad) + + async def _get_identifier_context( + self, identifier: Identifier + ) -> CleanupContext | None: + raw: bytes | None = await self.redis_client_sdk.redis.get( + self._to_redis_key(identifier) + ) + return self._deserialize_cleanup_context(raw.decode()) if raw else None + + async def _get_tracked(self) -> dict[Identifier, CleanupContext]: + identifiers: list[Identifier] = [ + self._from_redis_key(redis_key) + for redis_key in await self.redis_client_sdk.redis.keys( + f"{self._redis_key_prefix()}*" + ) + ] + + cleanup_contexts: list[CleanupContext | None] = await logged_gather( + *(self._get_identifier_context(identifier) for identifier in identifiers), + max_concurrency=_REDIS_MAX_CONCURRENCY, + ) + + return { + identifier: cleanup_context + for identifier, cleanup_context in zip( + identifiers, cleanup_contexts, strict=True + ) + # NOTE: cleanup_context will be None if the key was removed before + # recovering all the cleanup_contexts + if cleanup_context is not None + } + + async def _cleanup_unused_identifiers(self) -> None: + # removes no longer used identifiers + tracked_data: dict[Identifier, CleanupContext] = await self._get_tracked() + _logger.info("Will remove unused %s", list(tracked_data.keys())) + + for identifier, cleanup_context in tracked_data.items(): + if await self.is_used(identifier, cleanup_context): + continue + + await self.remove(identifier) + + async def create( + self, *, cleanup_context: CleanupContext, **extra_kwargs + ) -> tuple[Identifier, ResourceObject]: + """Used for creating the resources + + Arguments: + cleanup_context -- user defined CleanupContext object + **extra_kwargs -- can be overloaded by the user + + Returns: + tuple[identifier for the resource, resource object] + """ + identifier, result = await self._create(**extra_kwargs) + await self.redis_client_sdk.redis.set( + self._to_redis_key(identifier), + self._serialize_cleanup_context(cleanup_context), + ) + return identifier, result + + async def remove(self, identifier: Identifier, *, reraise: bool = False) -> None: + """Attempts to remove the resource, if an error occurs it is logged. + + Arguments: + identifier -- user chosen identifier for the resource + reraise -- when True raises any exception raised by ``destroy`` (default: {False}) + """ + + cleanup_context = await self._get_identifier_context(identifier) + if cleanup_context is None: + _logger.warning( + "Something went wrong, did not find any context for %s", identifier + ) + return + + with log_context( + _logger, logging.DEBUG, f"{self.__class__}: removing {identifier}" + ), log_catch(_logger, reraise=reraise): + await self._destroy(identifier, cleanup_context) + @classmethod @abstractmethod def _deserialize_identifier(cls, raw: str) -> Identifier: @@ -139,150 +277,3 @@ async def _destroy( identifier -- user chosen identifier for the resource cleanup_context -- user defined CleanupContext object """ - - # PUBLIC - - async def create( - self, *, cleanup_context: CleanupContext, **extra_kwargs - ) -> tuple[Identifier, ResourceObject]: - """Used for creating the resources - - Arguments: - cleanup_context -- user defined CleanupContext object - **extra_kwargs -- can be overloaded by the user - - Returns: - tuple[identifier for the resource, resource object] - """ - identifier, result = await self._create(**extra_kwargs) - await self.redis_client_sdk.redis.set( - self._to_redis_key(identifier), - self._serialize_cleanup_context(cleanup_context), - ) - return identifier, result - - async def remove(self, identifier: Identifier, *, reraise: bool = False) -> None: - """Attempts to remove the resource, if an error occurs it is logged. - - Arguments: - identifier -- user chosen identifier for the resource - reraise -- when True raises any exception raised by ``destroy`` (default: {False}) - """ - - cleanup_context = await self._get_identifier_context(identifier) - if cleanup_context is None: - _logger.warning( - "Something went wrong, did not find any context for %s", identifier - ) - return - - with log_context( - _logger, logging.DEBUG, f"{self.__class__}: removing {identifier}" - ), log_catch(_logger, reraise=reraise): - await self._destroy(identifier, cleanup_context) - - async def setup(self) -> None: - self._cleanup_task = start_periodic_task( - self._cleanup_unused_identifiers, - interval=self.cleanup_interval, - task_name="cleanup_unused_identifiers_task", - ) - - async def shutdown(self) -> None: - if self._cleanup_task: - await stop_periodic_task(self._cleanup_task, timeout=5) - - # UTILS - - @classmethod - @property - def class_path(cls) -> str: - return f"{cls.__module__}.{cls.__name__}" - - @classmethod - @property - def _redis_key_prefix(cls) -> str: - return f"{cls.class_path}:" - - @classmethod - def _to_redis_key(cls, identifier: Identifier) -> str: - return f"{cls._redis_key_prefix}{cls._serialize_identifier(identifier)}" - - @classmethod - def _from_redis_key(cls, redis_key: str) -> Identifier: - return cls._deserialize_identifier( - redis_key.removeprefix(cls._redis_key_prefix) - ) - - # CORE - - def __init__( - self, - redis_client_sdk: RedisClientSDK, - *, - cleanup_interval: timedelta = _DEFAULT_CLEANUP_INTERVAL, - ) -> None: - """ - Arguments: - redis_client_sdk -- client connecting to Redis - - Keyword Arguments: - cleanup_interval -- interval at which cleanup for unused - resources runs (default: {_DEFAULT_CLEANUP_INTERVAL}) - """ - - if not redis_client_sdk.redis_dsn.endswith( - f"{RedisDatabase.DISTRIBUTED_IDENTIFIERS}" - ): - msg = ( - f"Redis endpoint {redis_client_sdk.redis_dsn} contains the wrong database." - f"Expected {RedisDatabase.DISTRIBUTED_IDENTIFIERS}" - ) - raise TypeError(msg) - - self.redis_client_sdk = redis_client_sdk - self.cleanup_interval = cleanup_interval - - self._cleanup_task: Task | None = None - - async def _get_identifier_context( - self, identifier: Identifier - ) -> CleanupContext | None: - raw: bytes | None = await self.redis_client_sdk.redis.get( - self._to_redis_key(identifier) - ) - return self._deserialize_cleanup_context(raw) if raw else None - - async def _get_tracked(self) -> dict[Identifier, CleanupContext]: - identifiers: list[Identifier] = [ - self._from_redis_key(redis_key) - for redis_key in await self.redis_client_sdk.redis.keys( - f"{self._redis_key_prefix}*" - ) - ] - - cleanup_contexts: list[CleanupContext | None] = await logged_gather( - *(self._get_identifier_context(identifier) for identifier in identifiers), - max_concurrency=_REDIS_MAX_CONCURRENCY, - ) - - return { - identifier: cleanup_context - for identifier, cleanup_context in zip( - identifiers, cleanup_contexts, strict=True - ) - # NOTE: cleanup_context will be None if the key was removed before - # recovering all the cleanup_contexts - if cleanup_context is not None - } - - async def _cleanup_unused_identifiers(self) -> None: - # removes no longer used identifiers - tracked_data: dict[Identifier, CleanupContext] = await self._get_tracked() - _logger.info("Will remove unused %s", list(tracked_data.keys())) - - for identifier, cleanup_context in tracked_data.items(): - if await self.is_used(identifier, cleanup_context): - continue - - await self.remove(identifier) diff --git a/services/director-v2/tests/unit/test_utils_distributed_identifier.py b/services/director-v2/tests/unit/test_utils_distributed_identifier.py index b86d1d1c5aa..68be4d79cd8 100644 --- a/services/director-v2/tests/unit/test_utils_distributed_identifier.py +++ b/services/director-v2/tests/unit/test_utils_distributed_identifier.py @@ -344,6 +344,6 @@ class ChildRandomTextResourcesManager(RandomTextResourcesManager): # check keys contain the correct prefixes key_prefixes: set[str] = {k.split(":")[0] for k in keys} assert key_prefixes == { - RandomTextResourcesManager.class_path, - ChildRandomTextResourcesManager.class_path, + RandomTextResourcesManager.class_path(), + ChildRandomTextResourcesManager.class_path(), } From 77d27b4adedd6f6b65acffab337fa35ebb8a4ecb Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Fri, 17 Nov 2023 16:09:47 +0100 Subject: [PATCH 13/20] cleanup naming --- .../unit/test_utils_distributed_identifier.py | 51 +++++++++++-------- 1 file changed, 30 insertions(+), 21 deletions(-) diff --git a/services/director-v2/tests/unit/test_utils_distributed_identifier.py b/services/director-v2/tests/unit/test_utils_distributed_identifier.py index 68be4d79cd8..b26f601a6b2 100644 --- a/services/director-v2/tests/unit/test_utils_distributed_identifier.py +++ b/services/director-v2/tests/unit/test_utils_distributed_identifier.py @@ -60,8 +60,10 @@ def create(cls, length: int) -> "RandomTextEntry": return cls(text=text) -# mocked api interface class RandomTextAPI: + # Emulates an external API + # used to create resources + def __init__(self) -> None: self._created: dict[UserDefinedID, RandomTextEntry] = {} @@ -79,7 +81,8 @@ def get(self, identifier: UserDefinedID) -> RandomTextEntry | None: @dataclass class ComponentUsingRandomText: - # this one is tracking if the above resources are bring used + # Emulates another component in the system + # using the created resources _in_use: bool = True @@ -87,24 +90,28 @@ def is_used(self, an_id: UserDefinedID) -> bool: _ = an_id return self._in_use - def toggle_is_present(self, in_use: bool) -> None: + def toggle_usage(self, in_use: bool) -> None: self._in_use = in_use -class RandomTextCleanupContext(BaseModel): - # when cleaning up, extra parameters might be required - # in this case there aren't any since they are not required. +class AnEmptyTextCleanupContext(BaseModel): + # nothing is required during cleanup, so the context + # is an empty object. + # A ``pydantic.BaseModel`` is used for convenience + # this could have inherited from ``object`` ... -# define a custom manager using the custom user defined identifiers -# NOTE: note that the generic uses `[UserDefinedID, RandomTextEntry, RandomTextCleanupContext]` -# which enforces typing constraints on the overloaded abstract methods class RandomTextResourcesManager( BaseDistributedIdentifierManager[ - UserDefinedID, RandomTextEntry, RandomTextCleanupContext + UserDefinedID, RandomTextEntry, AnEmptyTextCleanupContext ] ): + # Implements a resource manager for handling the lifecycle of + # resources created by a service. + # It also comes in with automatic cleanup in case the service owing + # the resources failed to removed them in the past. + def __init__( self, redis_client_sdk: RedisClientSDK, @@ -125,21 +132,23 @@ def _serialize_identifier(cls, identifier: UserDefinedID) -> str: return f"{identifier._id}" # noqa: SLF001 @classmethod - def _deserialize_cleanup_context(cls, raw: StrBytes) -> RandomTextCleanupContext: - return RandomTextCleanupContext.parse_raw(raw) + def _deserialize_cleanup_context(cls, raw: StrBytes) -> AnEmptyTextCleanupContext: + return AnEmptyTextCleanupContext.parse_raw(raw) @classmethod def _serialize_cleanup_context( - cls, cleanup_context: RandomTextCleanupContext + cls, cleanup_context: AnEmptyTextCleanupContext ) -> str: return cleanup_context.json() async def is_used( - self, identifier: UserDefinedID, cleanup_context: RandomTextCleanupContext + self, identifier: UserDefinedID, cleanup_context: AnEmptyTextCleanupContext ) -> bool: _ = cleanup_context return self.component_using_random_text.is_used(identifier) + # NOTE: it is intended for the user to overwrite the **kwargs with custom names + # to provide a cleaner interface, tooling will complain slightly async def _create( # pylint:disable=arguments-differ # type:ignore [override] self, length: int ) -> tuple[UserDefinedID, RandomTextEntry]: @@ -149,7 +158,7 @@ async def get(self, identifier: UserDefinedID, **_) -> RandomTextEntry | None: return self.api.get(identifier) async def _destroy( - self, identifier: UserDefinedID, _: RandomTextCleanupContext + self, identifier: UserDefinedID, _: AnEmptyTextCleanupContext ) -> None: self.api.delete(identifier) @@ -208,7 +217,7 @@ async def test_full_workflow( ): # creation identifier, _ = await manager.create( - cleanup_context=RandomTextCleanupContext(), length=1 + cleanup_context=AnEmptyTextCleanupContext(), length=1 ) assert await manager.get(identifier) is not None @@ -240,7 +249,7 @@ async def test_remove_raises_error( # after creation object is present identifier, _ = await manager.create( - cleanup_context=RandomTextCleanupContext(), length=1 + cleanup_context=AnEmptyTextCleanupContext(), length=1 ) assert await manager.get(identifier) is not None @@ -259,7 +268,7 @@ async def _create_resources( ) -> list[UserDefinedID]: creation_results: list[tuple[UserDefinedID, RandomTextEntry]] = await logged_gather( *[ - manager.create(cleanup_context=RandomTextCleanupContext(), length=1) + manager.create(cleanup_context=AnEmptyTextCleanupContext(), length=1) for _ in range(count) ], max_concurrency=_MAX_REDIS_CONCURRENCY, @@ -309,7 +318,7 @@ async def test_background_removal_of_unused_resources( await _assert_all_resources(manager_with_no_cleanup_task, identifiers, exist=True) # make resources unused in external system - component_using_random_text.toggle_is_present(in_use=False) + component_using_random_text.toggle_usage(in_use=False) await manager_with_no_cleanup_task._cleanup_unused_identifiers() # noqa: SLF001 await _assert_all_resources(manager_with_no_cleanup_task, identifiers, exist=False) @@ -331,10 +340,10 @@ class ChildRandomTextResourcesManager(RandomTextResourcesManager): # create an entry in the child and one in the parent parent_identifier, _ = await parent_manager.create( - cleanup_context=RandomTextCleanupContext(), length=1 + cleanup_context=AnEmptyTextCleanupContext(), length=1 ) child_identifier, _ = await child_manager.create( - cleanup_context=RandomTextCleanupContext(), length=1 + cleanup_context=AnEmptyTextCleanupContext(), length=1 ) assert parent_identifier != child_identifier From 2b210f22252309f98db1fde60162c8487d91d33a Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Fri, 17 Nov 2023 16:11:42 +0100 Subject: [PATCH 14/20] fixed wrong type --- .../utils/distributed_identifier.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/utils/distributed_identifier.py b/services/director-v2/src/simcore_service_director_v2/utils/distributed_identifier.py index 0b26d53cdfc..54bf998165a 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/distributed_identifier.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/distributed_identifier.py @@ -96,10 +96,10 @@ def _from_redis_key(cls, redis_key: str) -> Identifier: async def _get_identifier_context( self, identifier: Identifier ) -> CleanupContext | None: - raw: bytes | None = await self.redis_client_sdk.redis.get( + raw: str | None = await self.redis_client_sdk.redis.get( self._to_redis_key(identifier) ) - return self._deserialize_cleanup_context(raw.decode()) if raw else None + return self._deserialize_cleanup_context(raw) if raw else None async def _get_tracked(self) -> dict[Identifier, CleanupContext]: identifiers: list[Identifier] = [ From 14d2a19ecefe17c826c8977118ed3d778d1f0018 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Fri, 17 Nov 2023 16:38:57 +0100 Subject: [PATCH 15/20] fixed bug and tests --- .../modules/api_keys_manager.py | 11 ++++++----- .../utils/distributed_identifier.py | 2 ++ .../unit/test_modules_api_keys_manager.py | 19 ++++++++++++++----- 3 files changed, 22 insertions(+), 10 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/api_keys_manager.py b/services/director-v2/src/simcore_service_director_v2/modules/api_keys_manager.py index 1b75f06ce5d..fb1a521e1e5 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/api_keys_manager.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/api_keys_manager.py @@ -1,4 +1,5 @@ from datetime import timedelta +from typing import Any from uuid import uuid5 from fastapi import FastAPI @@ -77,7 +78,7 @@ async def _create( # type:ignore [override] # pylint:disable=arguments-differ async def get( # type:ignore [override] # pylint:disable=arguments-differ self, identifier: str, product_name: ProductName, user_id: UserID ) -> ApiKeyGet | None: - result = await self.rpc_client.request( + result: Any | None = await self.rpc_client.request( WEBSERVER_RPC_NAMESPACE, parse_obj_as(RPCMethodName, "api_key_get"), product_name=product_name, @@ -107,11 +108,11 @@ async def get_or_create_api_key( api_keys_manager = _get_api_keys_manager(app) display_name = _get_api_key_name(node_id, run_id) - key_data: ApiKeyGet | None = await api_keys_manager.get( + api_key: ApiKeyGet | None = await api_keys_manager.get( identifier=display_name, product_name=product_name, user_id=user_id ) - if key_data is None: - _, key_data = await api_keys_manager.create( + if api_key is None: + _, api_key = await api_keys_manager.create( cleanup_context=CleanupContext( node_id=node_id, product_name=product_name, user_id=user_id ), @@ -120,7 +121,7 @@ async def get_or_create_api_key( user_id=user_id, ) - return key_data + return api_key async def safe_remove(app: FastAPI, *, node_id: NodeID, run_id: RunID) -> None: diff --git a/services/director-v2/src/simcore_service_director_v2/utils/distributed_identifier.py b/services/director-v2/src/simcore_service_director_v2/utils/distributed_identifier.py index 54bf998165a..f90c7522d91 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/distributed_identifier.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/distributed_identifier.py @@ -174,6 +174,8 @@ async def remove(self, identifier: Identifier, *, reraise: bool = False) -> None ), log_catch(_logger, reraise=reraise): await self._destroy(identifier, cleanup_context) + await self.redis_client_sdk.redis.delete(self._to_redis_key(identifier)) + @classmethod @abstractmethod def _deserialize_identifier(cls, raw: str) -> Identifier: diff --git a/services/director-v2/tests/unit/test_modules_api_keys_manager.py b/services/director-v2/tests/unit/test_modules_api_keys_manager.py index f6c7fad53d9..2e9da785228 100644 --- a/services/director-v2/tests/unit/test_modules_api_keys_manager.py +++ b/services/director-v2/tests/unit/test_modules_api_keys_manager.py @@ -67,6 +67,7 @@ def redis_client_sdk(redis_service: RedisSettings) -> RedisClientSDK: async def mock_rpc_server( rabbitmq_rpc_client: Callable[[str], Awaitable[RabbitMQRPCClient]], mocker: MockerFixture, + faker: Faker, ) -> RabbitMQRPCClient: rpc_client = await rabbitmq_rpc_client("client") rpc_server = await rabbitmq_rpc_client("mock_server") @@ -75,17 +76,25 @@ async def mock_rpc_server( # mocks the interface defined in the webserver + _storage: dict[str, ApiKeyGet] = {} + @router.expose() async def api_key_get( product_name: ProductName, user_id: UserID, name: str - ) -> ApiKeyGet: - return ApiKeyGet.parse_obj(ApiKeyGet.Config.schema_extra["examples"][0]) + ) -> ApiKeyGet | None: + return _storage.get(f"{product_name}{user_id}", None) @router.expose() async def create_api_keys( product_name: ProductName, user_id: UserID, new: ApiKeyCreate ) -> ApiKeyGet: - return ApiKeyGet.parse_obj(ApiKeyGet.Config.schema_extra["examples"][0]) + api_key = ApiKeyGet( + display_name=new.display_name, + api_key=faker.pystr(), + api_secret=faker.pystr(), + ) + _storage[f"{product_name}{user_id}"] = api_key + return api_key @router.expose() async def delete_api_keys( @@ -112,7 +121,7 @@ def app() -> FastAPI: @pytest.fixture def mock_dynamic_sidecars_scheduler(app: FastAPI, is_used: bool) -> None: scheduler_mock = AsyncMock() - scheduler_mock.is_service_tracked = lambda: is_used + scheduler_mock.is_service_tracked = lambda _: is_used app.state.dynamic_sidecar_scheduler = scheduler_mock @@ -167,4 +176,4 @@ async def test_background_cleanup( assert await _get_resource_count(api_keys_manager) == 1 await api_keys_manager._cleanup_unused_identifiers() # noqa: SLF001 - assert await _get_resource_count(api_keys_manager) == 1 if is_used else 0 + assert await _get_resource_count(api_keys_manager) == (1 if is_used else 0) From 196720c5dcc330a1619c12b1e8640b8089a44bc2 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Mon, 20 Nov 2023 10:23:17 +0100 Subject: [PATCH 16/20] module rename --- .../src/simcore_service_director_v2/modules/api_keys_manager.py | 2 +- ...distributed_identifier.py => base_distributed_identifier.py} | 0 .../director-v2/tests/unit/test_utils_distributed_identifier.py | 2 +- 3 files changed, 2 insertions(+), 2 deletions(-) rename services/director-v2/src/simcore_service_director_v2/utils/{distributed_identifier.py => base_distributed_identifier.py} (100%) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/api_keys_manager.py b/services/director-v2/src/simcore_service_director_v2/modules/api_keys_manager.py index fb1a521e1e5..618458446f7 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/api_keys_manager.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/api_keys_manager.py @@ -16,7 +16,7 @@ from settings_library.redis import RedisDatabase from ..core.settings import AppSettings -from ..utils.distributed_identifier import BaseDistributedIdentifierManager +from ..utils.base_distributed_identifier import BaseDistributedIdentifierManager from .rabbitmq import get_rabbitmq_rpc_client _CLEANUP_INTERVAL = timedelta(minutes=5) diff --git a/services/director-v2/src/simcore_service_director_v2/utils/distributed_identifier.py b/services/director-v2/src/simcore_service_director_v2/utils/base_distributed_identifier.py similarity index 100% rename from services/director-v2/src/simcore_service_director_v2/utils/distributed_identifier.py rename to services/director-v2/src/simcore_service_director_v2/utils/base_distributed_identifier.py diff --git a/services/director-v2/tests/unit/test_utils_distributed_identifier.py b/services/director-v2/tests/unit/test_utils_distributed_identifier.py index b26f601a6b2..ce200feef97 100644 --- a/services/director-v2/tests/unit/test_utils_distributed_identifier.py +++ b/services/director-v2/tests/unit/test_utils_distributed_identifier.py @@ -15,7 +15,7 @@ from servicelib.redis import RedisClientSDK from servicelib.utils import logged_gather from settings_library.redis import RedisDatabase, RedisSettings -from simcore_service_director_v2.utils.distributed_identifier import ( +from simcore_service_director_v2.utils.base_distributed_identifier import ( BaseDistributedIdentifierManager, ) From 88cc2d18e83ee67a014281c1d46297b9135b2d21 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Mon, 20 Nov 2023 10:24:24 +0100 Subject: [PATCH 17/20] making private --- .../utils/base_distributed_identifier.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/utils/base_distributed_identifier.py b/services/director-v2/src/simcore_service_director_v2/utils/base_distributed_identifier.py index f90c7522d91..7700811c1a5 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/base_distributed_identifier.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/base_distributed_identifier.py @@ -60,7 +60,7 @@ def __init__( ) raise TypeError(msg) - self.redis_client_sdk = redis_client_sdk + self._redis_client_sdk = redis_client_sdk self.cleanup_interval = cleanup_interval self._cleanup_task: Task | None = None @@ -96,7 +96,7 @@ def _from_redis_key(cls, redis_key: str) -> Identifier: async def _get_identifier_context( self, identifier: Identifier ) -> CleanupContext | None: - raw: str | None = await self.redis_client_sdk.redis.get( + raw: str | None = await self._redis_client_sdk.redis.get( self._to_redis_key(identifier) ) return self._deserialize_cleanup_context(raw) if raw else None @@ -104,7 +104,7 @@ async def _get_identifier_context( async def _get_tracked(self) -> dict[Identifier, CleanupContext]: identifiers: list[Identifier] = [ self._from_redis_key(redis_key) - for redis_key in await self.redis_client_sdk.redis.keys( + for redis_key in await self._redis_client_sdk.redis.keys( f"{self._redis_key_prefix()}*" ) ] @@ -148,7 +148,7 @@ async def create( tuple[identifier for the resource, resource object] """ identifier, result = await self._create(**extra_kwargs) - await self.redis_client_sdk.redis.set( + await self._redis_client_sdk.redis.set( self._to_redis_key(identifier), self._serialize_cleanup_context(cleanup_context), ) @@ -174,7 +174,7 @@ async def remove(self, identifier: Identifier, *, reraise: bool = False) -> None ), log_catch(_logger, reraise=reraise): await self._destroy(identifier, cleanup_context) - await self.redis_client_sdk.redis.delete(self._to_redis_key(identifier)) + await self._redis_client_sdk.redis.delete(self._to_redis_key(identifier)) @classmethod @abstractmethod From 766b140488a4b8d71ffc6d33962a04dace0c3385 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Mon, 20 Nov 2023 10:52:20 +0100 Subject: [PATCH 18/20] fixed comment --- .../utils/base_distributed_identifier.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/director-v2/src/simcore_service_director_v2/utils/base_distributed_identifier.py b/services/director-v2/src/simcore_service_director_v2/utils/base_distributed_identifier.py index 7700811c1a5..b52db358631 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/base_distributed_identifier.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/base_distributed_identifier.py @@ -26,7 +26,7 @@ class BaseDistributedIdentifierManager( """Used to implement managers for resources that require book keeping in a distributed system. - NOTE: that ``Identifier`` and ``ResourceObject`` are serialized and deserialized + NOTE: that ``Identifier`` and ``CleanupContext`` are serialized and deserialized to and from Redis. Generics: From 5c3d4edf54ae38af28ebfe51a670f302854fd939 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Mon, 20 Nov 2023 10:54:58 +0100 Subject: [PATCH 19/20] trying to use multiline --- services/autoscaling/tests/manual/docker-compose.yml | 10 +++++++++- services/clusters-keeper/docker-compose.yml | 10 +++++++++- services/docker-compose-ops.yml | 10 +++++++++- 3 files changed, 27 insertions(+), 3 deletions(-) diff --git a/services/autoscaling/tests/manual/docker-compose.yml b/services/autoscaling/tests/manual/docker-compose.yml index e70eb62ca3c..1596fdeef7c 100644 --- a/services/autoscaling/tests/manual/docker-compose.yml +++ b/services/autoscaling/tests/manual/docker-compose.yml @@ -40,7 +40,15 @@ services: ports: - "18081:8081" environment: - - REDIS_HOSTS=resources:${REDIS_HOST}:${REDIS_PORT}:0,locks:${REDIS_HOST}:${REDIS_PORT}:1,validation_codes:${REDIS_HOST}:${REDIS_PORT}:2,scheduled_maintenance:${REDIS_HOST}:${REDIS_PORT}:3,user_notifications:${REDIS_HOST}:${REDIS_PORT}:4,announcements:${REDIS_HOST}:${REDIS_PORT}:5 + - >+ + REDIS_HOSTS= + resources:${REDIS_HOST}:${REDIS_PORT}:0, + locks:${REDIS_HOST}:${REDIS_PORT}:1, + validation_codes:${REDIS_HOST}:${REDIS_PORT}:2, + scheduled_maintenance:${REDIS_HOST}:${REDIS_PORT}:3, + user_notifications:${REDIS_HOST}:${REDIS_PORT}:4, + announcements:${REDIS_HOST}:${REDIS_PORT}:5, + distributed_identifiers:${REDIS_HOST}:${REDIS_PORT}:6 # If you add/remove a db, do not forget to update the --databases entry in the docker-compose.yml autoscaling: diff --git a/services/clusters-keeper/docker-compose.yml b/services/clusters-keeper/docker-compose.yml index 7ab1543d273..fbb21abfd0f 100644 --- a/services/clusters-keeper/docker-compose.yml +++ b/services/clusters-keeper/docker-compose.yml @@ -35,7 +35,15 @@ services: ports: - "18081:8081" environment: - - REDIS_HOSTS=resources:${REDIS_HOST}:${REDIS_PORT}:0,locks:${REDIS_HOST}:${REDIS_PORT}:1,validation_codes:${REDIS_HOST}:${REDIS_PORT}:2,scheduled_maintenance:${REDIS_HOST}:${REDIS_PORT}:3,user_notifications:${REDIS_HOST}:${REDIS_PORT}:4,announcements:${REDIS_HOST}:${REDIS_PORT}:5 + - >+ + REDIS_HOSTS= + resources:${REDIS_HOST}:${REDIS_PORT}:0, + locks:${REDIS_HOST}:${REDIS_PORT}:1, + validation_codes:${REDIS_HOST}:${REDIS_PORT}:2, + scheduled_maintenance:${REDIS_HOST}:${REDIS_PORT}:3, + user_notifications:${REDIS_HOST}:${REDIS_PORT}:4, + announcements:${REDIS_HOST}:${REDIS_PORT}:5, + distributed_identifiers:${REDIS_HOST}:${REDIS_PORT}:6 # If you add/remove a db, do not forget to update the --databases entry in the docker-compose.yml clusters-keeper: diff --git a/services/docker-compose-ops.yml b/services/docker-compose-ops.yml index 57ba3575f14..777ebf4edab 100644 --- a/services/docker-compose-ops.yml +++ b/services/docker-compose-ops.yml @@ -86,7 +86,15 @@ services: image: rediscommander/redis-commander:latest init: true environment: - - REDIS_HOSTS=resources:${REDIS_HOST}:${REDIS_PORT}:0,locks:${REDIS_HOST}:${REDIS_PORT}:1,validation_codes:${REDIS_HOST}:${REDIS_PORT}:2,scheduled_maintenance:${REDIS_HOST}:${REDIS_PORT}:3,user_notifications:${REDIS_HOST}:${REDIS_PORT}:4,announcements:${REDIS_HOST}:${REDIS_PORT}:5,distributed_identifiers:${REDIS_HOST}:${REDIS_PORT}:6 + - >+ + REDIS_HOSTS= + resources:${REDIS_HOST}:${REDIS_PORT}:0, + locks:${REDIS_HOST}:${REDIS_PORT}:1, + validation_codes:${REDIS_HOST}:${REDIS_PORT}:2, + scheduled_maintenance:${REDIS_HOST}:${REDIS_PORT}:3, + user_notifications:${REDIS_HOST}:${REDIS_PORT}:4, + announcements:${REDIS_HOST}:${REDIS_PORT}:5, + distributed_identifiers:${REDIS_HOST}:${REDIS_PORT}:6 # If you add/remove a db, do not forget to update the --databases entry in the docker-compose.yml ports: - "18081:8081" From ecdaa98ba7fce8f83e13f8106cc5d46e9bc5c13e Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Mon, 20 Nov 2023 10:56:43 +0100 Subject: [PATCH 20/20] stripping spaces --- services/autoscaling/tests/manual/docker-compose.yml | 2 +- services/clusters-keeper/docker-compose.yml | 2 +- services/docker-compose-ops.yml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/services/autoscaling/tests/manual/docker-compose.yml b/services/autoscaling/tests/manual/docker-compose.yml index 1596fdeef7c..7dbcf5bd4cc 100644 --- a/services/autoscaling/tests/manual/docker-compose.yml +++ b/services/autoscaling/tests/manual/docker-compose.yml @@ -40,7 +40,7 @@ services: ports: - "18081:8081" environment: - - >+ + - >- REDIS_HOSTS= resources:${REDIS_HOST}:${REDIS_PORT}:0, locks:${REDIS_HOST}:${REDIS_PORT}:1, diff --git a/services/clusters-keeper/docker-compose.yml b/services/clusters-keeper/docker-compose.yml index fbb21abfd0f..1d8d5126194 100644 --- a/services/clusters-keeper/docker-compose.yml +++ b/services/clusters-keeper/docker-compose.yml @@ -35,7 +35,7 @@ services: ports: - "18081:8081" environment: - - >+ + - >- REDIS_HOSTS= resources:${REDIS_HOST}:${REDIS_PORT}:0, locks:${REDIS_HOST}:${REDIS_PORT}:1, diff --git a/services/docker-compose-ops.yml b/services/docker-compose-ops.yml index 777ebf4edab..3dc6450d649 100644 --- a/services/docker-compose-ops.yml +++ b/services/docker-compose-ops.yml @@ -86,7 +86,7 @@ services: image: rediscommander/redis-commander:latest init: true environment: - - >+ + - >- REDIS_HOSTS= resources:${REDIS_HOST}:${REDIS_PORT}:0, locks:${REDIS_HOST}:${REDIS_PORT}:1,