Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit 96ba5f5
Author: sanderegg <[email protected]>
Date:   Tue Dec 5 10:19:31 2023 +0100

    use method to create hardware constraint and read
  • Loading branch information
sanderegg committed Dec 5, 2023
1 parent 4cbb637 commit 0da3023
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from typing import Any, TypeAlias

from .constants import DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY

DaskTaskResources: TypeAlias = dict[str, Any]


def create_ec2_resource_constraint_key(ec2_instance_type: str) -> str:
return f"{DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY}:{ec2_instance_type}"


def get_ec2_instance_type_from_resources(
task_resources: DaskTaskResources,
) -> str | None:
for resource_name in task_resources:
if resource_name.startswith(DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY):
return resource_name.split(":")[-1]
return None
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from dask_task_models_library.constants import DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY
from dask_task_models_library.resource_constraints import (
create_ec2_resource_constraint_key,
get_ec2_instance_type_from_resources,
)
from faker import Faker


def test_create_ec2_resource_constraint_key(faker: Faker):
faker_instance_type = faker.pystr()
assert (
create_ec2_resource_constraint_key(faker_instance_type)
== f"{DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY}:{faker_instance_type}"
)

empty_instance_type = ""
assert (
create_ec2_resource_constraint_key(empty_instance_type)
== f"{DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY}:"
)


def test_get_ec2_instance_type_from_resources(faker: Faker):
empty_task_resources = {}
assert get_ec2_instance_type_from_resources(empty_task_resources) is None
no_ec2_types_in_resources = {"blahblah": 1}
assert get_ec2_instance_type_from_resources(no_ec2_types_in_resources) is None

faker_instance_type = faker.pystr()
ec2_type_in_resources = {create_ec2_resource_constraint_key(faker_instance_type): 1}
assert (
get_ec2_instance_type_from_resources(ec2_type_in_resources)
== faker_instance_type
)
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from dataclasses import dataclass, field
from typing import Any, TypeAlias
from typing import TypeAlias

from aws_library.ec2.models import EC2InstanceData, EC2InstanceType, Resources
from dask_task_models_library.resource_constraints import DaskTaskResources
from models_library.generated_models.docker_rest_api import Node


Expand Down Expand Up @@ -55,7 +56,6 @@ class Cluster:


DaskTaskId: TypeAlias = str
DaskTaskResources: TypeAlias = dict[str, Any]


@dataclass(frozen=True, kw_only=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
from typing import Final

from aws_library.ec2.models import Resources
from dask_task_models_library.constants import DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY
from dask_task_models_library.resource_constraints import (
get_ec2_instance_type_from_resources,
)
from fastapi import FastAPI
from servicelib.utils_formatting import timedelta_as_minute_second
from types_aiobotocore_ec2.literals import InstanceTypeType

from ..core.settings import get_application_settings
from ..models import (
Expand All @@ -30,8 +31,8 @@ def get_max_resources_from_dask_task(task: DaskTask) -> Resources:
)


def get_task_instance_restriction(task: DaskTask) -> InstanceTypeType | None:
return task.required_resources.get(DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY)
def get_task_instance_restriction(task: DaskTask) -> str | None:
return get_ec2_instance_type_from_resources(task.required_resources)


def _compute_tasks_needed_resources(tasks: list[DaskTask]) -> Resources:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import distributed
import pytest
from aws_library.ec2.models import Resources
from dask_task_models_library.constants import DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY
from dask_task_models_library.resource_constraints import (
create_ec2_resource_constraint_key,
)
from faker import Faker
from fastapi import FastAPI
from models_library.docker import DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY
Expand Down Expand Up @@ -283,7 +285,7 @@ def _do(
}
)
if ec2_instance_type is not None:
resources[DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY] = ec2_instance_type
resources[create_ec2_resource_constraint_key(ec2_instance_type)] = 1
return resources

return _do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from typing import Any

import distributed
from dask_task_models_library.constants import DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY
from dask_task_models_library.container_tasks.docker import DockerBasicAuth
from dask_task_models_library.container_tasks.errors import TaskCancelledError
from dask_task_models_library.container_tasks.io import (
Expand All @@ -31,6 +30,9 @@
ContainerTaskParameters,
LogFileUploadURL,
)
from dask_task_models_library.resource_constraints import (
create_ec2_resource_constraint_key,
)
from distributed.scheduler import TaskStateState as DaskSchedulerTaskState
from fastapi import FastAPI
from models_library.api_schemas_directorv2.clusters import ClusterDetails, Scheduler
Expand Down Expand Up @@ -228,7 +230,9 @@ def _comp_sidecar_fct(
)
if hardware_info.aws_ec2_instances:
dask_resources[
f"{DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY}:{hardware_info.aws_ec2_instances[0]}"
create_ec2_resource_constraint_key(
hardware_info.aws_ec2_instances[0]
)
] = 1

dask_utils.check_scheduler_is_still_the_same(
Expand Down

0 comments on commit 0da3023

Please sign in to comment.