Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

♻️ director-v2 created api-keys are removed if not used by any service ⚠️ #5043

Merged
merged 24 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions packages/service-library/src/servicelib/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
from .logging_utils import log_catch, log_context

_DEFAULT_LOCK_TTL: Final[datetime.timedelta] = datetime.timedelta(seconds=10)
_CANCEL_TASK_TIMEOUT: Final[datetime.timedelta] = datetime.timedelta(seconds=0.1)
_MINUTE: Final[NonNegativeFloat] = 60
_WAIT_SECS: Final[NonNegativeFloat] = 1


logger = logging.getLogger(__name__)
Expand Down
1 change: 1 addition & 0 deletions packages/settings-library/src/settings_library/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class RedisDatabase(int, Enum):
SCHEDULED_MAINTENANCE = 3
USER_NOTIFICATIONS = 4
ANNOUNCEMENTS = 5
DISTRIBUTED_IDENTIFIERS = 6
GitHK marked this conversation as resolved.
Show resolved Hide resolved


class RedisSettings(BaseCustomSettings):
Expand Down
10 changes: 9 additions & 1 deletion services/autoscaling/tests/manual/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,15 @@ services:
ports:
- "18081:8081"
environment:
- REDIS_HOSTS=resources:${REDIS_HOST}:${REDIS_PORT}:0,locks:${REDIS_HOST}:${REDIS_PORT}:1,validation_codes:${REDIS_HOST}:${REDIS_PORT}:2,scheduled_maintenance:${REDIS_HOST}:${REDIS_PORT}:3,user_notifications:${REDIS_HOST}:${REDIS_PORT}:4,announcements:${REDIS_HOST}:${REDIS_PORT}:5
- >-
REDIS_HOSTS=
resources:${REDIS_HOST}:${REDIS_PORT}:0,
locks:${REDIS_HOST}:${REDIS_PORT}:1,
validation_codes:${REDIS_HOST}:${REDIS_PORT}:2,
scheduled_maintenance:${REDIS_HOST}:${REDIS_PORT}:3,
user_notifications:${REDIS_HOST}:${REDIS_PORT}:4,
announcements:${REDIS_HOST}:${REDIS_PORT}:5,
distributed_identifiers:${REDIS_HOST}:${REDIS_PORT}:6
# If you add/remove a db, do not forget to update the --databases entry in the docker-compose.yml

autoscaling:
Expand Down
10 changes: 9 additions & 1 deletion services/clusters-keeper/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,15 @@ services:
ports:
- "18081:8081"
environment:
- REDIS_HOSTS=resources:${REDIS_HOST}:${REDIS_PORT}:0,locks:${REDIS_HOST}:${REDIS_PORT}:1,validation_codes:${REDIS_HOST}:${REDIS_PORT}:2,scheduled_maintenance:${REDIS_HOST}:${REDIS_PORT}:3,user_notifications:${REDIS_HOST}:${REDIS_PORT}:4,announcements:${REDIS_HOST}:${REDIS_PORT}:5
- >-
REDIS_HOSTS=
resources:${REDIS_HOST}:${REDIS_PORT}:0,
locks:${REDIS_HOST}:${REDIS_PORT}:1,
validation_codes:${REDIS_HOST}:${REDIS_PORT}:2,
scheduled_maintenance:${REDIS_HOST}:${REDIS_PORT}:3,
user_notifications:${REDIS_HOST}:${REDIS_PORT}:4,
announcements:${REDIS_HOST}:${REDIS_PORT}:5,
distributed_identifiers:${REDIS_HOST}:${REDIS_PORT}:6
# If you add/remove a db, do not forget to update the --databases entry in the docker-compose.yml

clusters-keeper:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from datetime import timedelta
from typing import Any
from uuid import uuid5

from fastapi import FastAPI
Expand All @@ -6,38 +8,62 @@
from models_library.products import ProductName
from models_library.projects_nodes_io import NodeID
from models_library.rabbitmq_basic_types import RPCMethodName
from models_library.services import RunID
from models_library.users import UserID
from pydantic import parse_obj_as
from pydantic import BaseModel, StrBytes, parse_obj_as
from servicelib.rabbitmq import RabbitMQRPCClient
from servicelib.redis import RedisClientSDK
from settings_library.redis import RedisDatabase

from ..utils.distributed_identifer import BaseDistributedIdentifierManager
from ..core.settings import AppSettings
from ..utils.base_distributed_identifier import BaseDistributedIdentifierManager
from .rabbitmq import get_rabbitmq_rpc_client

_CLEANUP_INTERVAL = timedelta(minutes=5)

class APIKeysManager(BaseDistributedIdentifierManager[str, ApiKeyGet]):
def __init__(self, app: FastAPI) -> None:
self.GET_OR_CREATE_INJECTS_IDENTIFIER = True

class CleanupContext(BaseModel):
# used for checking if used
node_id: NodeID

# used for removing
product_name: ProductName
user_id: UserID


class APIKeysManager(BaseDistributedIdentifierManager[str, ApiKeyGet, CleanupContext]):
def __init__(self, app: FastAPI, redis_client_sdk: RedisClientSDK) -> None:
super().__init__(redis_client_sdk, cleanup_interval=_CLEANUP_INTERVAL)
self.app = app

@property
def rpc_client(self) -> RabbitMQRPCClient:
return get_rabbitmq_rpc_client(self.app)

# pylint:disable=arguments-differ
@classmethod
def _deserialize_identifier(cls, raw: str) -> str:
return raw

async def get( # type:ignore [override]
self, identifier: str, product_name: ProductName, user_id: UserID
) -> ApiKeyGet | None:
result = await self.rpc_client.request(
WEBSERVER_RPC_NAMESPACE,
parse_obj_as(RPCMethodName, "api_key_get"),
product_name=product_name,
user_id=user_id,
name=identifier,
@classmethod
def _serialize_identifier(cls, identifier: str) -> str:
return identifier

@classmethod
def _deserialize_cleanup_context(cls, raw: StrBytes) -> CleanupContext:
return CleanupContext.parse_raw(raw)

@classmethod
def _serialize_cleanup_context(cls, cleanup_context: CleanupContext) -> str:
return cleanup_context.json()

async def is_used(self, identifier: str, cleanup_context: CleanupContext) -> bool:
_ = identifier
scheduler: "DynamicSidecarsScheduler" = ( # type:ignore [name-defined] # noqa: F821
self.app.state.dynamic_sidecar_scheduler
)
return parse_obj_as(ApiKeyGet | None, result)
return bool(scheduler.is_service_tracked(cleanup_context.node_id))

async def create( # type:ignore [override]
async def _create( # type:ignore [override] # pylint:disable=arguments-differ
self, identifier: str, product_name: ProductName, user_id: UserID
) -> tuple[str, ApiKeyGet]:
result = await self.rpc_client.request(
Expand All @@ -49,49 +75,68 @@ async def create( # type:ignore [override]
)
return identifier, ApiKeyGet.parse_obj(result)

async def destroy( # type:ignore [override]
async def get( # type:ignore [override] # pylint:disable=arguments-differ
self, identifier: str, product_name: ProductName, user_id: UserID
) -> None:
await self.rpc_client.request(
) -> ApiKeyGet | None:
result: Any | None = await self.rpc_client.request(
WEBSERVER_RPC_NAMESPACE,
parse_obj_as(RPCMethodName, "delete_api_keys"),
parse_obj_as(RPCMethodName, "api_key_get"),
product_name=product_name,
user_id=user_id,
name=identifier,
)
return parse_obj_as(ApiKeyGet | None, result)

async def _destroy(self, identifier: str, cleanup_context: CleanupContext) -> None:
await self.rpc_client.request(
WEBSERVER_RPC_NAMESPACE,
parse_obj_as(RPCMethodName, "delete_api_keys"),
product_name=cleanup_context.product_name,
user_id=cleanup_context.user_id,
name=identifier,
)


async def get_or_create_api_key(
app: FastAPI, *, product_name: ProductName, user_id: UserID, node_id: NodeID
app: FastAPI,
*,
product_name: ProductName,
user_id: UserID,
node_id: NodeID,
run_id: RunID,
) -> ApiKeyGet:
api_keys_manager = _get_api_keys_manager(app)
display_name = _get_api_key_name(node_id)
display_name = _get_api_key_name(node_id, run_id)

key_data: ApiKeyGet | None = await api_keys_manager.get(
api_key: ApiKeyGet | None = await api_keys_manager.get(
identifier=display_name, product_name=product_name, user_id=user_id
)
if key_data is None:
_, key_data = await api_keys_manager.create(
identifier=display_name, product_name=product_name, user_id=user_id
if api_key is None:
_, api_key = await api_keys_manager.create(
cleanup_context=CleanupContext(
node_id=node_id, product_name=product_name, user_id=user_id
),
identifier=display_name,
product_name=product_name,
user_id=user_id,
)

return key_data
return api_key


async def safe_remove(
app: FastAPI, *, node_id: NodeID, product_name: ProductName, user_id: UserID
) -> None:
async def safe_remove(app: FastAPI, *, node_id: NodeID, run_id: RunID) -> None:
api_keys_manager = _get_api_keys_manager(app)
display_name = _get_api_key_name(node_id)
display_name = _get_api_key_name(node_id, run_id)

await api_keys_manager.remove(
identifier=display_name, product_name=product_name, user_id=user_id
)
await api_keys_manager.remove(identifier=display_name)


def _get_api_key_name(node_id: NodeID) -> str:
obfuscated_node_id = uuid5(node_id, f"{node_id}")
return f"_auto_{obfuscated_node_id}"
def _get_api_key_name(node_id: NodeID, run_id: RunID) -> str:
# Generates a new unique key name for each service run
# This avoids race conditions if the service is starting and
# an the cleanup job is removing the key from an old run which
# was wrongly shut down
return f"_auto_{uuid5(node_id, run_id)}"


def _get_api_keys_manager(app: FastAPI) -> APIKeysManager:
Expand All @@ -101,6 +146,19 @@ def _get_api_keys_manager(app: FastAPI) -> APIKeysManager:

def setup(app: FastAPI) -> None:
async def on_startup() -> None:
app.state.api_keys_manager = APIKeysManager(app)
settings: AppSettings = app.state.settings

redis_dsn = settings.REDIS.build_redis_dsn(
RedisDatabase.DISTRIBUTED_IDENTIFIERS
)
redis_client_sdk = RedisClientSDK(redis_dsn)
app.state.api_keys_manager = manager = APIKeysManager(app, redis_client_sdk)

await manager.setup()

async def on_shutdown() -> None:
manager: APIKeysManager = app.state.api_keys_manager
await manager.shutdown()

app.add_event_handler("startup", on_startup)
app.add_event_handler("shutdown", on_shutdown)
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
PathMappingsLabel,
SimcoreServiceLabels,
)
from models_library.services import ServiceKey, ServiceVersion
from models_library.services import RunID, ServiceKey, ServiceVersion
from models_library.services_resources import (
DEFAULT_SINGLE_SERVICE_NAME,
ResourcesDict,
Expand Down Expand Up @@ -275,6 +275,7 @@ async def assemble_spec( # pylint: disable=too-many-arguments # noqa: PLR0913
node_id: NodeID,
simcore_user_agent: str,
swarm_stack_name: str,
run_id: RunID,
) -> str:
"""
returns a docker-compose spec used by
Expand Down Expand Up @@ -391,6 +392,7 @@ async def assemble_spec( # pylint: disable=too-many-arguments # noqa: PLR0913
product_name=product_name,
user_id=user_id,
node_id=node_id,
run_id=run_id,
)

stringified_service_spec: str = replace_env_vars_in_compose_spec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ async def create_user_services(app: FastAPI, scheduler_data: SchedulerData):
node_id=scheduler_data.node_uuid,
simcore_user_agent=scheduler_data.request_simcore_user_agent,
swarm_stack_name=dynamic_services_scheduler_settings.SWARM_STACK_NAME,
run_id=scheduler_data.run_id,
)

_logger.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,7 @@ async def attempt_pod_removal_and_data_saving(
)

await safe_remove(
app,
node_id=scheduler_data.node_uuid,
product_name=scheduler_data.product_name,
user_id=scheduler_data.user_id,
app, node_id=scheduler_data.node_uuid, run_id=scheduler_data.run_id
)

# remove sidecar's api client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from models_library.products import ProductName
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.services import ServiceKey, ServiceVersion
from models_library.services import RunID, ServiceKey, ServiceVersion
from models_library.users import UserID
from models_library.utils.specs_substitution import SpecsSubstitutionsResolver
from pydantic import BaseModel, EmailStr
Expand Down Expand Up @@ -187,6 +187,7 @@ async def resolve_and_substitute_service_lifetime_variables_in_specs(
product_name: ProductName,
user_id: UserID,
node_id: NodeID,
run_id: RunID,
safe: bool = True,
) -> dict[str, Any]:
registry: OsparcVariablesTable = app.state.lifespan_osparc_variables_table
Expand All @@ -204,6 +205,7 @@ async def resolve_and_substitute_service_lifetime_variables_in_specs(
product_name=product_name,
user_id=user_id,
node_id=node_id,
run_id=run_id,
),
# NOTE: the api key and secret cannot be resolved in parallel
# due to race conditions
Expand All @@ -218,25 +220,35 @@ async def resolve_and_substitute_service_lifetime_variables_in_specs(


async def _get_or_create_api_key(
app: FastAPI, product_name: ProductName, user_id: UserID, node_id: NodeID
app: FastAPI,
product_name: ProductName,
user_id: UserID,
node_id: NodeID,
run_id: RunID,
) -> str:
key_data = await get_or_create_api_key(
app,
product_name=product_name,
user_id=user_id,
node_id=node_id,
run_id=run_id,
)
return key_data.api_key # type:ignore [no-any-return]


async def _get_or_create_api_secret(
app: FastAPI, product_name: ProductName, user_id: UserID, node_id: NodeID
app: FastAPI,
product_name: ProductName,
user_id: UserID,
node_id: NodeID,
run_id: RunID,
) -> str:
key_data = await get_or_create_api_key(
app,
product_name=product_name,
user_id=user_id,
node_id=node_id,
run_id=run_id,
)
return key_data.api_secret # type:ignore [no-any-return]

Expand Down Expand Up @@ -271,7 +283,6 @@ def _setup_session_osparc_variables(app: FastAPI):
("OSPARC_VARIABLE_PRODUCT_NAME", "product_name"),
("OSPARC_VARIABLE_STUDY_UUID", "project_id"),
("OSPARC_VARIABLE_NODE_ID", "node_id"),
# ANE -> PC: why not register the user_id as well at this point?
]:
table.register_from_context(name, context_name)

Expand Down
Loading
Loading