Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] gpu memory scheduling prototype #41147

Draft
wants to merge 14 commits into
base: master
Choose a base branch
from
Draft
28 changes: 28 additions & 0 deletions python/ray/_private/accelerators/accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,31 @@ def get_ec2_instance_accelerator_type(
Return None if it's unknown.
"""
return None

@staticmethod
def get_current_node_accelerator_memory() -> int:
"""Get the total number of accelerators of this family on the current node.

Returns:
The detected total number of accelerators of this family.
Return 0 if the current node doesn't contain accelerators of this family.
"""
return 0

@staticmethod
def get_ec2_instance_accelerator_memory(
instance_type: str, instances: dict
) -> Optional[str]:
"""Get the accelerator total memory of this family on the current node.

Args:
instance_type: The ec2 instance type.
instances: Map from ec2 instance type to instance metadata returned by
ec2 `describe-instance-types`.

Returns:
The accelerator total memory of this family in bytes on the ec2 instance
with given type.
Return None if it's unknown.
"""
return None
4 changes: 4 additions & 0 deletions python/ray/_private/accelerators/intel_gpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,7 @@ def set_current_process_visible_accelerator_ids(
os.environ[
IntelGPUAcceleratorManager.get_visible_accelerator_ids_env_var()
] = prefix + ",".join([str(i) for i in visible_xpu_devices])

@staticmethod
def get_current_node_accelerator_memory() -> int:
return 0
28 changes: 28 additions & 0 deletions python/ray/_private/accelerators/nvidia_gpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,31 @@ def get_ec2_instance_accelerator_type(
assert len(gpus) == 1
return gpus[0]["Name"]
return None

@staticmethod
def get_current_node_accelerator_memory() -> int:
try:
pynvml.nvmlInit()
except pynvml.NVMLError:
return 0 # pynvml init failed
device_count = pynvml.nvmlDeviceGetCount()
cuda_device_memory = 0
if device_count > 0:
handle = pynvml.nvmlDeviceGetHandleByIndex(0)
cuda_device_memory = int(pynvml.nvmlDeviceGetMemoryInfo(handle).total)
pynvml.nvmlShutdown()
return cuda_device_memory

@staticmethod
def get_ec2_instance_accelerator_memory(
instance_type: str, instances: dict
) -> Optional[str]:
if instance_type not in instances:
return None

gpus = instances[instance_type].get("GpuInfo", {}).get("Gpus")
if gpus is not None:
# TODO(ameer): currently we support one gpu type per node.
assert len(gpus) == 1
return int(gpus[0]["MemoryInfo"]["SizeInMiB"]) * 1024 * 1024
return None
5 changes: 4 additions & 1 deletion python/ray/_private/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@ def merge_resources(env_dict, params_dict):
num_cpus = env_dict.pop("CPU", None)
num_gpus = env_dict.pop("GPU", None)
memory = env_dict.pop("memory", None)
gpu_memory = env_dict.pop("gpu_memory", None)
object_store_memory = env_dict.pop("object_store_memory", None)

result = params_dict.copy()
Expand All @@ -518,7 +519,7 @@ def merge_resources(env_dict, params_dict):
"Autoscaler is overriding your resource:"
f"{key}: {params_dict[key]} with {env_dict[key]}."
)
return num_cpus, num_gpus, memory, object_store_memory, result
return num_cpus, num_gpus, memory, gpu_memory, object_store_memory, result

if not self._resource_spec:
env_resources = {}
Expand All @@ -534,13 +535,15 @@ def merge_resources(env_dict, params_dict):
num_cpus,
num_gpus,
memory,
gpu_memory,
object_store_memory,
resources,
) = merge_resources(env_resources, self._ray_params.resources)
self._resource_spec = ResourceSpec(
self._ray_params.num_cpus if num_cpus is None else num_cpus,
self._ray_params.num_gpus if num_gpus is None else num_gpus,
self._ray_params.memory if memory is None else memory,
self._ray_params._gpu_memory if gpu_memory is None else gpu_memory,
self._ray_params.object_store_memory
if object_store_memory is None
else object_store_memory,
Expand Down
6 changes: 6 additions & 0 deletions python/ray/_private/parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class RayParams:
of that resource available.
labels: The key-value labels of the node.
memory: Total available memory for workers requesting memory.
_gpu_memory: Total available memory from all GPUs in MB.
object_store_memory: The amount of memory (in bytes) to start the
object store with.
redis_max_memory: The max amount of memory (in bytes) to allow redis
Expand Down Expand Up @@ -141,6 +142,7 @@ def __init__(
resources: Optional[Dict[str, float]] = None,
labels: Optional[Dict[str, str]] = None,
memory: Optional[float] = None,
_gpu_memory: Optional[float] = None,
object_store_memory: Optional[float] = None,
redis_max_memory: Optional[float] = None,
redis_port: Optional[int] = None,
Expand Down Expand Up @@ -198,6 +200,7 @@ def __init__(
self.num_cpus = num_cpus
self.num_gpus = num_gpus
self.memory = memory
self._gpu_memory = _gpu_memory
self.object_store_memory = object_store_memory
self.resources = resources
self.redis_max_memory = redis_max_memory
Expand Down Expand Up @@ -439,6 +442,9 @@ def build_error(resource, alternative):
assert "CPU" not in self.resources, build_error("CPU", "num_cpus")
assert "GPU" not in self.resources, build_error("GPU", "num_gpus")
assert "memory" not in self.resources, build_error("memory", "memory")
assert "gpu_memory" not in self.resources, build_error(
"gpu_memory", "_gpu_memory"
)
assert "object_store_memory" not in self.resources, build_error(
"object_store_memory", "object_store_memory"
)
Expand Down
1 change: 1 addition & 0 deletions python/ray/_private/ray_option_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ def _validate_resources(resources: Optional[Dict[str, float]]) -> Optional[str]:
"name": Option((str, type(None))),
"num_cpus": _resource_option("num_cpus"),
"num_gpus": _resource_option("num_gpus"),
"_gpu_memory": _resource_option("_gpu_memory"),
"object_store_memory": _counting_option("object_store_memory", False),
# TODO(suquark): "placement_group", "placement_group_bundle_index"
# and "placement_group_capture_child_tasks" are deprecated,
Expand Down
15 changes: 15 additions & 0 deletions python/ray/_private/resource_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class ResourceSpec(
"num_cpus",
"num_gpus",
"memory",
"gpu_memory",
"object_store_memory",
"resources",
"redis_max_memory",
Expand All @@ -39,6 +40,7 @@ class ResourceSpec(
num_cpus: The CPUs allocated for this raylet.
num_gpus: The GPUs allocated for this raylet.
memory: The memory allocated for this raylet.
gpu_memory: The total GPUs' memory allocated for this raylet.
object_store_memory: The object store memory allocated for this raylet.
Note that when calling to_resource_dict(), this will be scaled down
by 30% to account for the global plasma LRU reserve.
Expand All @@ -55,6 +57,7 @@ def __new__(
num_cpus=None,
num_gpus=None,
memory=None,
gpu_memory=None,
object_store_memory=None,
resources=None,
redis_max_memory=None,
Expand All @@ -64,6 +67,7 @@ def __new__(
num_cpus,
num_gpus,
memory,
gpu_memory,
object_store_memory,
resources,
redis_max_memory,
Expand All @@ -89,6 +93,7 @@ def to_resource_dict(self):
CPU=self.num_cpus,
GPU=self.num_gpus,
memory=int(self.memory),
gpu_memory=int(self.gpu_memory),
object_store_memory=int(self.object_store_memory),
)

Expand Down Expand Up @@ -141,6 +146,7 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None):
assert "CPU" not in resources, resources
assert "GPU" not in resources, resources
assert "memory" not in resources, resources
assert "gpu_memory" not in resources, resources
assert "object_store_memory" not in resources, resources

if node_ip_address is None:
Expand All @@ -165,6 +171,7 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None):
num_cpus = ray._private.utils.get_num_cpus()

num_gpus = 0
gpu_memory = 0
for (
accelerator_resource_name
) in ray._private.accelerators.get_all_accelerator_resource_names():
Expand Down Expand Up @@ -208,6 +215,13 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None):
if num_accelerators:
if accelerator_resource_name == "GPU":
num_gpus = num_accelerators
gpu_memory = (
num_accelerators * (
self.gpu_memory if self.gpu_memory
else
accelerator_manager.get_current_node_accelerator_memory()
)
)
else:
resources[accelerator_resource_name] = num_accelerators

Expand Down Expand Up @@ -298,6 +312,7 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None):
num_cpus,
num_gpus,
memory,
gpu_memory,
object_store_memory,
resources,
redis_max_memory,
Expand Down
13 changes: 10 additions & 3 deletions python/ray/_private/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,15 +361,20 @@ def resources_from_ray_options(options_dict: Dict[str, Any]) -> Dict[str, Any]:
raise ValueError(
"The resources dictionary must not contain the key 'CPU' or 'GPU'"
)
elif "memory" in resources or "object_store_memory" in resources:
elif (
"memory" in resources
or "object_store_memory" in resources
or "_gpu_memory" in resources
):
raise ValueError(
"The resources dictionary must not "
"contain the key 'memory' or 'object_store_memory'"
"The resources dictionary must not contain the key"
" 'memory' or 'object_store_memory' or '_gpu_memory'"
)

num_cpus = options_dict.get("num_cpus")
num_gpus = options_dict.get("num_gpus")
memory = options_dict.get("memory")
gpu_memory = options_dict.get("_gpu_memory")
object_store_memory = options_dict.get("object_store_memory")
accelerator_type = options_dict.get("accelerator_type")

Expand All @@ -381,6 +386,8 @@ def resources_from_ray_options(options_dict: Dict[str, Any]) -> Dict[str, Any]:
resources["memory"] = int(memory)
if object_store_memory is not None:
resources["object_store_memory"] = object_store_memory
if gpu_memory is not None:
resources["gpu_memory"] = int(gpu_memory)
if accelerator_type is not None:
resources[
f"{ray_constants.RESOURCE_CONSTRAINT_PREFIX}{accelerator_type}"
Expand Down
12 changes: 12 additions & 0 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1157,6 +1157,7 @@ def init(
resources: Optional[Dict[str, float]] = None,
labels: Optional[Dict[str, str]] = None,
object_store_memory: Optional[int] = None,
_gpu_memory: Optional[int] = None,
local_mode: bool = False,
ignore_reinit_error: bool = False,
include_dashboard: Optional[bool] = None,
Expand Down Expand Up @@ -1287,6 +1288,8 @@ def init(
_driver_object_store_memory: Deprecated.
_memory: Amount of reservable memory resource in bytes rounded
down to the nearest integer.
_gpu_memory: The gpu memory request in bytes for this task/actor
from a single gpu, rounded up to the nearest integer.
_redis_password: Prevents external clients without the password
from connecting to Redis if provided.
_temp_dir: If provided, specifies the root temporary
Expand Down Expand Up @@ -1562,6 +1565,7 @@ def sigterm_handler(signum, frame):
dashboard_host=dashboard_host,
dashboard_port=dashboard_port,
memory=_memory,
_gpu_memory=_gpu_memory,
object_store_memory=object_store_memory,
redis_max_memory=_redis_max_memory,
plasma_store_socket_name=None,
Expand Down Expand Up @@ -1590,6 +1594,11 @@ def sigterm_handler(signum, frame):
"When connecting to an existing cluster, num_cpus "
"and num_gpus must not be provided."
)
if _gpu_memory is not None:
raise ValueError(
"When connecting to an existing cluster, "
"_gpu_memory must not be provided."
)
if resources is not None:
raise ValueError(
"When connecting to an existing cluster, "
Expand Down Expand Up @@ -3127,6 +3136,7 @@ def remote(
resources: Dict[str, float] = Undefined,
accelerator_type: str = Undefined,
memory: Union[int, float] = Undefined,
_gpu_memory: Union[int, float] = Undefined,
max_calls: int = Undefined,
max_restarts: int = Undefined,
max_task_retries: int = Undefined,
Expand Down Expand Up @@ -3296,6 +3306,8 @@ def method(self):
See :ref:`accelerator types <accelerator_types>`.
memory: The heap memory request in bytes for this task/actor,
rounded down to the nearest integer.
_gpu_memory: The gpu memory request in bytes for this task/actor
from a single gpu, rounded up to the nearest integer.
max_calls: Only for *remote functions*. This specifies the
maximum number of times that a given worker can execute
the given remote function before it must exit
Expand Down
12 changes: 12 additions & 0 deletions python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ class _ActorClassMetadata:
num_gpus: The default number of GPUs required by the actor creation
task.
memory: The heap memory quota for this actor.
_gpu_memory: The gpu memory request in bytes for this actor.
resources: The default resources required by the actor creation task.
accelerator_type: The specified type of accelerator required for the
node on which this actor runs.
Expand All @@ -376,6 +377,7 @@ def __init__(
num_cpus,
num_gpus,
memory,
_gpu_memory,
object_store_memory,
resources,
accelerator_type,
Expand All @@ -394,6 +396,7 @@ def __init__(
self.num_cpus = num_cpus
self.num_gpus = num_gpus
self.memory = memory
self._gpu_memory = _gpu_memory
self.object_store_memory = object_store_memory
self.resources = resources
self.accelerator_type = accelerator_type
Expand Down Expand Up @@ -595,6 +598,8 @@ def options(self, **actor_options):
See :ref:`accelerator types <accelerator_types>`.
memory: The heap memory request in bytes for this task/actor,
rounded down to the nearest integer.
_gpu_memory: The gpu memory request in bytes for this task/actor
from a single gpu, rounded up to the nearest integer.
object_store_memory: The object store memory request for actors only.
max_restarts: This specifies the maximum
number of times that the actor should be restarted when it dies
Expand Down Expand Up @@ -718,6 +723,7 @@ def _remote(self, args=None, kwargs=None, **actor_options):
num_cpus: The number of CPUs required by the actor creation task.
num_gpus: The number of GPUs required by the actor creation task.
memory: Restrict the heap memory usage of this actor.
_gpu_memory: Restrict the gpu memory usage of this actor.
resources: The custom resources required by the actor creation
task.
max_concurrency: The max number of concurrent calls to allow for
Expand Down Expand Up @@ -1424,6 +1430,12 @@ def _make_actor(cls, actor_options):
Class = _modify_class(cls)
_inject_tracing_into_class(Class)

if actor_options.get("_gpu_memory", None) and actor_options.get("num_gpus", None):
raise ValueError(
"Specifying both `num_gpus` and `_gpu_memory` is not allowed. "
"See more at: (link TBD)"
)

if "max_restarts" in actor_options:
if actor_options["max_restarts"] != -1: # -1 represents infinite restart
# Make sure we don't pass too big of an int to C++, causing
Expand Down
13 changes: 13 additions & 0 deletions python/ray/autoscaler/_private/aws/node_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,19 @@ def fillout_available_node_types_resources(
autodetected_resources[
f"accelerator_type:{accelerator_type}"
] = 1
# autodetect gpu memory
gpu_memory = (
accelerator_manager.get_ec2_instance_accelerator_memory(
instance_type, instances_dict
)
)
if (
accelerator_manager.get_resource_name() == "GPU"
and gpu_memory
):
autodetected_resources["gpu_memory"] = (
num_accelerators * gpu_memory
)

autodetected_resources.update(
available_node_types[node_type].get("resources", {})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ def create_node_with_resources_and_labels(
num_cpus=resources.pop("CPU", 0),
num_gpus=resources.pop("GPU", 0),
object_store_memory=resources.pop("object_store_memory", None),
_gpu_memory=resources.pop("gpu_memory", 0), # gpu memory = 600,
resources=resources,
labels=labels,
redis_address="{}:6379".format(
Expand Down
Loading
Loading