Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎨Autoscaling: chunk prepulled image AWS EC2 tags #6232

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions packages/aws-library/src/aws_library/ec2/_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,17 @@ class EC2InstanceType:

class AWSTagKey(ConstrainedStr):
# see [https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/Using_Tags.html#tag-restrictions]
regex = re.compile(r"^(?!(_index|\.{1,2})$)[a-zA-Z0-9\+\-=\._:@]{1,128}$")
regex = re.compile(r"^(?!(_index|\.{1,2})$)[a-zA-Z0-9\+\-=\._:@]+$")
min_length = 1
max_length = 128


class AWSTagValue(ConstrainedStr):
# see [https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/Using_Tags.html#tag-restrictions]
# quotes []{} were added as it allows to json encode. it seems to be accepted as a value
regex = re.compile(r"^[a-zA-Z0-9\s\+\-=\.,_:/@\"\'\[\]\{\}]{0,256}$")
regex = re.compile(r"^[a-zA-Z0-9\s\+\-=\.,_:/@\"\'\[\]\{\}]*$")
min_length = 0
max_length = 256


EC2Tags: TypeAlias = dict[AWSTagKey, AWSTagValue]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import re
from typing import Final

from aws_library.ec2._models import AWSTagKey, AWSTagValue, EC2Tags
from pydantic import parse_obj_as

BUFFER_MACHINE_PULLING_EC2_TAG_KEY: Final[AWSTagKey] = parse_obj_as(
AWSTagKey, "pulling"
sanderegg marked this conversation as resolved.
Show resolved Hide resolved
)
BUFFER_MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY: Final[AWSTagKey] = parse_obj_as(
AWSTagKey, "ssm-command-id"
)
PREPULL_COMMAND_NAME: Final[str] = "docker images pulling"

DOCKER_PULL_COMMAND: Final[
str
] = "docker compose -f /docker-pull.compose.yml -p buffering pull"

PRE_PULLED_IMAGES_EC2_TAG_KEY: Final[AWSTagKey] = parse_obj_as(
AWSTagKey, "io.simcore.autoscaling.pre_pulled_images"
)

BUFFER_MACHINE_TAG_KEY: Final[AWSTagKey] = parse_obj_as(
AWSTagKey, "io.simcore.autoscaling.buffer_machine"
)
DEACTIVATED_BUFFER_MACHINE_EC2_TAGS: Final[EC2Tags] = {
BUFFER_MACHINE_TAG_KEY: parse_obj_as(AWSTagValue, "true")
}
ACTIVATED_BUFFER_MACHINE_EC2_TAGS: Final[EC2Tags] = {
BUFFER_MACHINE_TAG_KEY: parse_obj_as(AWSTagValue, "false")
}
PRE_PULLED_IMAGES_RE: Final[re.Pattern] = re.compile(
rf"{PRE_PULLED_IMAGES_EC2_TAG_KEY}_\((\d+)\)"
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@

import logging
from collections import defaultdict
from typing import Final, TypeAlias, cast
from typing import TypeAlias, cast

from aws_library.ec2 import (
AWSTagKey,
AWSTagValue,
EC2InstanceConfig,
EC2InstanceData,
Expand All @@ -31,31 +30,28 @@
SSMCommandExecutionTimeoutError,
)
from fastapi import FastAPI
from models_library.utils.json_serialization import json_dumps, json_loads
from pydantic import NonNegativeInt, parse_obj_as
from pydantic import NonNegativeInt
from servicelib.logging_utils import log_context
from types_aiobotocore_ec2.literals import InstanceTypeType

from ..constants import (
BUFFER_MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY,
BUFFER_MACHINE_PULLING_EC2_TAG_KEY,
DOCKER_PULL_COMMAND,
PREPULL_COMMAND_NAME,
)
from ..core.settings import get_application_settings
from ..models import BufferPool, BufferPoolManager
from ..utils.auto_scaling_core import ec2_buffer_startup_script
from ..utils.buffer_machines_pool_core import get_deactivated_buffer_ec2_tags
from ..utils.buffer_machines_pool_core import (
dump_pre_pulled_images_as_tags,
get_deactivated_buffer_ec2_tags,
load_pre_pulled_images_from_tags,
)
from .auto_scaling_mode_base import BaseAutoscaling
from .ec2 import get_ec2_client
from .ssm import get_ssm_client

_BUFFER_MACHINE_PULLING_EC2_TAG_KEY: Final[AWSTagKey] = parse_obj_as(
AWSTagKey, "pulling"
)
_BUFFER_MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY: Final[AWSTagKey] = parse_obj_as(
AWSTagKey, "ssm-command-id"
)
_PREPULL_COMMAND_NAME: Final[str] = "docker images pulling"
_PRE_PULLED_IMAGES_EC2_TAG_KEY: Final[AWSTagKey] = parse_obj_as(
AWSTagKey, "io.simcore.autoscaling.pre_pulled_images"
)


_logger = logging.getLogger(__name__)


Expand All @@ -64,7 +60,7 @@ async def _analyze_running_instance_state(
):
ssm_client = get_ssm_client(app)

if _BUFFER_MACHINE_PULLING_EC2_TAG_KEY in instance.tags:
if BUFFER_MACHINE_PULLING_EC2_TAG_KEY in instance.tags:
buffer_pool.pulling_instances.add(instance)
elif await ssm_client.is_instance_connected_to_ssm_server(instance.id):
app_settings = get_application_settings(app)
Expand Down Expand Up @@ -176,9 +172,7 @@ async def _terminate_instances_with_invalid_pre_pulled_images(

for instance in all_pre_pulled_instances:
if (
pre_pulled_images := json_loads(
instance.tags.get(_PRE_PULLED_IMAGES_EC2_TAG_KEY, "[]")
)
pre_pulled_images := load_pre_pulled_images_from_tags(instance.tags)
) and pre_pulled_images != ec2_boot_config.pre_pull_images:
_logger.info(
"%s",
Expand Down Expand Up @@ -269,9 +263,6 @@ async def _add_remove_buffer_instances(

InstancesToStop: TypeAlias = set[EC2InstanceData]
InstancesToTerminate: TypeAlias = set[EC2InstanceData]
_DOCKER_PULL_COMMAND: Final[
str
] = "docker compose -f /docker-pull.compose.yml -p buffering pull"


async def _handle_pool_image_pulling(
Expand All @@ -283,14 +274,14 @@ async def _handle_pool_image_pulling(
# trigger the image pulling
ssm_command = await ssm_client.send_command(
[instance.id for instance in pool.waiting_to_pull_instances],
command=_DOCKER_PULL_COMMAND,
command_name=_PREPULL_COMMAND_NAME,
command=DOCKER_PULL_COMMAND,
command_name=PREPULL_COMMAND_NAME,
)
await ec2_client.set_instances_tags(
tuple(pool.waiting_to_pull_instances),
tags={
_BUFFER_MACHINE_PULLING_EC2_TAG_KEY: AWSTagValue("true"),
_BUFFER_MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY: AWSTagValue(
BUFFER_MACHINE_PULLING_EC2_TAG_KEY: AWSTagValue("true"),
BUFFER_MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY: AWSTagValue(
ssm_command.command_id
),
},
Expand All @@ -301,7 +292,7 @@ async def _handle_pool_image_pulling(
# wait for the image pulling to complete
for instance in pool.pulling_instances:
if ssm_command_id := instance.tags.get(
_BUFFER_MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY
BUFFER_MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY
):
ssm_command = await ssm_client.get_command(
instance.id, command_id=ssm_command_id
Expand All @@ -323,15 +314,11 @@ async def _handle_pool_image_pulling(
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
await ec2_client.set_instances_tags(
tuple(instances_to_stop),
tags={
_PRE_PULLED_IMAGES_EC2_TAG_KEY: AWSTagValue(
json_dumps(
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES[
instance_type
].pre_pull_images
)
)
},
tags=dump_pre_pulled_images_as_tags(
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES[
instance_type
].pre_pull_images
),
)
return instances_to_stop, broken_instances_to_terminate

Expand All @@ -357,8 +344,8 @@ async def _handle_image_pre_pulling(
"pending buffer instances completed pulling of images, stopping them",
):
tag_keys_to_remove = (
_BUFFER_MACHINE_PULLING_EC2_TAG_KEY,
_BUFFER_MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY,
BUFFER_MACHINE_PULLING_EC2_TAG_KEY,
BUFFER_MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY,
)
await ec2_client.remove_instances_tags(
tuple(instances_to_stop),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,33 +1,33 @@
from typing import Final
from collections.abc import Iterable
from operator import itemgetter

from aws_library.ec2 import AWSTagKey, AWSTagValue, EC2Tags
from fastapi import FastAPI
from pydantic import parse_obj_as
from models_library.docker import DockerGenericTag
from models_library.utils.json_serialization import json_dumps
from pydantic import parse_obj_as, parse_raw_as

from ..modules.auto_scaling_mode_base import BaseAutoscaling

_BUFFER_MACHINE_TAG_KEY: Final[AWSTagKey] = parse_obj_as(
AWSTagKey, "io.simcore.autoscaling.buffer_machine"
from ..constants import (
ACTIVATED_BUFFER_MACHINE_EC2_TAGS,
BUFFER_MACHINE_TAG_KEY,
DEACTIVATED_BUFFER_MACHINE_EC2_TAGS,
PRE_PULLED_IMAGES_EC2_TAG_KEY,
PRE_PULLED_IMAGES_RE,
)
_DEACTIVATED_BUFFER_MACHINE_EC2_TAGS: Final[EC2Tags] = {
_BUFFER_MACHINE_TAG_KEY: parse_obj_as(AWSTagValue, "true")
}
_ACTIVATED_BUFFER_MACHINE_EC2_TAGS: Final[EC2Tags] = {
_BUFFER_MACHINE_TAG_KEY: parse_obj_as(AWSTagValue, "false")
}
from ..modules.auto_scaling_mode_base import BaseAutoscaling


def get_activated_buffer_ec2_tags(
app: FastAPI, auto_scaling_mode: BaseAutoscaling
) -> EC2Tags:
return auto_scaling_mode.get_ec2_tags(app) | _ACTIVATED_BUFFER_MACHINE_EC2_TAGS
return auto_scaling_mode.get_ec2_tags(app) | ACTIVATED_BUFFER_MACHINE_EC2_TAGS


def get_deactivated_buffer_ec2_tags(
app: FastAPI, auto_scaling_mode: BaseAutoscaling
) -> EC2Tags:
base_ec2_tags = (
auto_scaling_mode.get_ec2_tags(app) | _DEACTIVATED_BUFFER_MACHINE_EC2_TAGS
auto_scaling_mode.get_ec2_tags(app) | DEACTIVATED_BUFFER_MACHINE_EC2_TAGS
)
base_ec2_tags[AWSTagKey("Name")] = AWSTagValue(
f"{base_ec2_tags[AWSTagKey('Name')]}-buffer"
Expand All @@ -36,4 +36,49 @@ def get_deactivated_buffer_ec2_tags(


def is_buffer_machine(tags: EC2Tags) -> bool:
return bool(_BUFFER_MACHINE_TAG_KEY in tags)
return bool(BUFFER_MACHINE_TAG_KEY in tags)


def dump_pre_pulled_images_as_tags(images: Iterable[DockerGenericTag]) -> EC2Tags:
# AWS Tag Values are limited to 256 characaters so we chunk the images
# into smaller chunks
jsonized_images = json_dumps(images)
assert AWSTagValue.max_length # nosec
if len(jsonized_images) > AWSTagValue.max_length:
# let's chunk the string
chunk_size = AWSTagValue.max_length
chunks = [
jsonized_images[i : i + chunk_size]
for i in range(0, len(jsonized_images), chunk_size)
]
return {
AWSTagKey(f"{PRE_PULLED_IMAGES_EC2_TAG_KEY}_({i})"): AWSTagValue(c)
for i, c in enumerate(chunks)
sanderegg marked this conversation as resolved.
Show resolved Hide resolved
}
return {
PRE_PULLED_IMAGES_EC2_TAG_KEY: parse_obj_as(AWSTagValue, json_dumps(images))
}


def load_pre_pulled_images_from_tags(tags: EC2Tags) -> list[DockerGenericTag]:
# AWS Tag values are limited to 256 characters so we chunk the images
if PRE_PULLED_IMAGES_EC2_TAG_KEY in tags:
# read directly
return parse_raw_as(list[DockerGenericTag], tags[PRE_PULLED_IMAGES_EC2_TAG_KEY])

assembled_json = "".join(
map(
itemgetter(1),
sorted(
sanderegg marked this conversation as resolved.
Show resolved Hide resolved
(
(int(m.group(1)), value)
for key, value in tags.items()
if (m := PRE_PULLED_IMAGES_RE.match(key))
),
key=itemgetter(0),
),
)
)
if assembled_json:
return parse_raw_as(list[DockerGenericTag], assembled_json)
return []
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@
)
from pytest_simcore.helpers.logging_tools import log_context
from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict
from simcore_service_autoscaling.constants import PRE_PULLED_IMAGES_EC2_TAG_KEY
from simcore_service_autoscaling.core.settings import ApplicationSettings
from simcore_service_autoscaling.modules.auto_scaling_mode_dynamic import (
DynamicAutoscaling,
)
from simcore_service_autoscaling.modules.buffer_machines_pool_core import (
_PRE_PULLED_IMAGES_EC2_TAG_KEY,
monitor_buffer_machines,
)
from simcore_service_autoscaling.utils.buffer_machines_pool_core import (
Expand Down Expand Up @@ -311,7 +311,7 @@ async def _assert_wait_for_ssm_command_to_finish() -> None:
expected_instance_type=next(iter(ec2_instances_allowed_types)),
expected_instance_state="stopped",
expected_additional_tag_keys=[
_PRE_PULLED_IMAGES_EC2_TAG_KEY,
PRE_PULLED_IMAGES_EC2_TAG_KEY,
*list(ec2_instance_custom_tags),
],
expected_pre_pulled_images=pre_pulled_images,
Expand Down Expand Up @@ -376,7 +376,7 @@ async def _do(
if pre_pull_images is not None and instance_state_name == "stopped":
resource_tags.append(
{
"Key": _PRE_PULLED_IMAGES_EC2_TAG_KEY,
"Key": PRE_PULLED_IMAGES_EC2_TAG_KEY,
"Value": f"{json_dumps(pre_pull_images)}",
}
)
Expand Down
Loading
Loading