From 7725af7eb74fe6a418d671ec185bd002189e5b4c Mon Sep 17 00:00:00 2001 From: Jonathan Nitisastro Date: Tue, 14 Nov 2023 17:29:56 -0800 Subject: [PATCH 01/14] gpu memory prototype init Signed-off-by: Jonathan Nitisastro --- python/ray/_private/accelerators/intel_gpu.py | 4 ++ .../ray/_private/accelerators/nvidia_gpu.py | 4 ++ python/ray/_private/node.py | 7 ++- python/ray/_private/parameter.py | 6 +++ python/ray/_private/resource_spec.py | 10 ++++ .../scheduling/cluster_resource_data.cc | 47 +++++++++++++++++-- .../common/scheduling/cluster_resource_data.h | 22 +++++++++ .../scheduling/resource_instance_set.cc | 10 +++- src/ray/common/scheduling/scheduling_ids.h | 8 +++- .../scheduling/local_resource_manager.cc | 6 ++- 10 files changed, 115 insertions(+), 9 deletions(-) diff --git a/python/ray/_private/accelerators/intel_gpu.py b/python/ray/_private/accelerators/intel_gpu.py index bd6f1c0fcbb14..9bf6042f7f758 100644 --- a/python/ray/_private/accelerators/intel_gpu.py +++ b/python/ray/_private/accelerators/intel_gpu.py @@ -101,3 +101,7 @@ def set_current_process_visible_accelerator_ids( os.environ[ IntelGPUAcceleratorManager.get_visible_accelerator_ids_env_var() ] = prefix + ",".join([str(i) for i in visible_xpu_devices]) + + @staticmethod + def get_current_node_gpu_memory() -> int: + return 0 diff --git a/python/ray/_private/accelerators/nvidia_gpu.py b/python/ray/_private/accelerators/nvidia_gpu.py index 9d776221b9e2a..e4ed0aec6a7ce 100644 --- a/python/ray/_private/accelerators/nvidia_gpu.py +++ b/python/ray/_private/accelerators/nvidia_gpu.py @@ -122,3 +122,7 @@ def get_ec2_instance_accelerator_type( assert len(gpus) == 1 return gpus[0]["Name"] return None + + @staticmethod + def get_current_node_gpu_memory() -> int: + return 1000 diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index a40d61f4c5ac1..20ed31e322c92 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -507,6 +507,7 @@ def merge_resources(env_dict, params_dict): num_cpus = env_dict.pop("CPU", None) num_gpus = env_dict.pop("GPU", None) memory = env_dict.pop("memory", None) + gpu_memory = env_dict.pop("gpu_memory", None) object_store_memory = env_dict.pop("object_store_memory", None) result = params_dict.copy() @@ -518,7 +519,7 @@ def merge_resources(env_dict, params_dict): "Autoscaler is overriding your resource:" f"{key}: {params_dict[key]} with {env_dict[key]}." ) - return num_cpus, num_gpus, memory, object_store_memory, result + return num_cpus, num_gpus, memory, gpu_memory, object_store_memory, result if not self._resource_spec: env_resources = {} @@ -534,13 +535,17 @@ def merge_resources(env_dict, params_dict): num_cpus, num_gpus, memory, + gpu_memory, object_store_memory, resources, ) = merge_resources(env_resources, self._ray_params.resources) + if num_gpus and gpu_memory and num_gpus != len(gpu_memory): + raise ValueError(f"Number of gpus specified: {gpu_memory} does not match specified num_gpus: {num_gpus}") self._resource_spec = ResourceSpec( self._ray_params.num_cpus if num_cpus is None else num_cpus, self._ray_params.num_gpus if num_gpus is None else num_gpus, self._ray_params.memory if memory is None else memory, + self._ray_params.gpu_memory if gpu_memory is None else gpu_memory, self._ray_params.object_store_memory if object_store_memory is None else object_store_memory, diff --git a/python/ray/_private/parameter.py b/python/ray/_private/parameter.py index 1770dd2fa00d9..9c354176a2e97 100644 --- a/python/ray/_private/parameter.py +++ b/python/ray/_private/parameter.py @@ -33,6 +33,7 @@ class RayParams: of that resource available. labels: The key-value labels of the node. memory: Total available memory for workers requesting memory. + gpu_memory: Total available memory for each gpu from num_gpus. object_store_memory: The amount of memory (in bytes) to start the object store with. redis_max_memory: The max amount of memory (in bytes) to allow redis @@ -141,6 +142,7 @@ def __init__( resources: Optional[Dict[str, float]] = None, labels: Optional[Dict[str, str]] = None, memory: Optional[float] = None, + gpu_memory: Optional[List[float]] = None, object_store_memory: Optional[float] = None, redis_max_memory: Optional[float] = None, redis_port: Optional[int] = None, @@ -198,6 +200,7 @@ def __init__( self.num_cpus = num_cpus self.num_gpus = num_gpus self.memory = memory + self.gpu_memory = gpu_memory self.object_store_memory = object_store_memory self.resources = resources self.redis_max_memory = redis_max_memory @@ -439,6 +442,9 @@ def build_error(resource, alternative): assert "CPU" not in self.resources, build_error("CPU", "num_cpus") assert "GPU" not in self.resources, build_error("GPU", "num_gpus") assert "memory" not in self.resources, build_error("memory", "memory") + assert "gpu_memory" not in self.resources, build_error( + "gpu_memory", "gpu_memory" + ) assert "object_store_memory" not in self.resources, build_error( "object_store_memory", "object_store_memory" ) diff --git a/python/ray/_private/resource_spec.py b/python/ray/_private/resource_spec.py index 8b7d3728b9f62..b96747e702519 100644 --- a/python/ray/_private/resource_spec.py +++ b/python/ray/_private/resource_spec.py @@ -23,6 +23,7 @@ class ResourceSpec( "num_cpus", "num_gpus", "memory", + "gpu_memory", "object_store_memory", "resources", "redis_max_memory", @@ -39,6 +40,7 @@ class ResourceSpec( num_cpus: The CPUs allocated for this raylet. num_gpus: The GPUs allocated for this raylet. memory: The memory allocated for this raylet. + gpu_memory: The GPUs' memory allocated for this raylet. object_store_memory: The object store memory allocated for this raylet. Note that when calling to_resource_dict(), this will be scaled down by 30% to account for the global plasma LRU reserve. @@ -55,6 +57,7 @@ def __new__( num_cpus=None, num_gpus=None, memory=None, + gpu_memory=None, object_store_memory=None, resources=None, redis_max_memory=None, @@ -64,6 +67,7 @@ def __new__( num_cpus, num_gpus, memory, + gpu_memory, object_store_memory, resources, redis_max_memory, @@ -89,6 +93,7 @@ def to_resource_dict(self): CPU=self.num_cpus, GPU=self.num_gpus, memory=int(self.memory), + gpu_memory=int(self.gpu_memory), object_store_memory=int(self.object_store_memory), ) @@ -141,6 +146,7 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None): assert "CPU" not in resources, resources assert "GPU" not in resources, resources assert "memory" not in resources, resources + assert "gpu_memory" not in resources, resources assert "object_store_memory" not in resources, resources if node_ip_address is None: @@ -165,6 +171,7 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None): num_cpus = ray._private.utils.get_num_cpus() num_gpus = 0 + gpu_memory = 0 for ( accelerator_resource_name ) in ray._private.accelerators.get_all_accelerator_resource_names(): @@ -208,6 +215,8 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None): if num_accelerators: if accelerator_resource_name == "GPU": num_gpus = num_accelerators + gpu_memory = num_accelerators * (self.gpu_memory if self.gpu_memory else accelerator_manager.get_current_node_gpu_memory()) + resources["gpu_memory"] = gpu_memory else: resources[accelerator_resource_name] = num_accelerators @@ -298,6 +307,7 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None): num_cpus, num_gpus, memory, + gpu_memory, object_store_memory, resources, redis_max_memory, diff --git a/src/ray/common/scheduling/cluster_resource_data.cc b/src/ray/common/scheduling/cluster_resource_data.cc index 0e66316ba2808..d938b1e41c6b1 100644 --- a/src/ray/common/scheduling/cluster_resource_data.cc +++ b/src/ray/common/scheduling/cluster_resource_data.cc @@ -92,17 +92,18 @@ bool NodeResources::IsAvailable(const ResourceRequest &resource_request, RAY_LOG(DEBUG) << "At pull manager capacity"; return false; } - + const ResourceSet resource_request_adjusted = this->ProcessRelativeResource(resource_request.GetResourceSet()); if (!this->normal_task_resources.IsEmpty()) { auto available_resources = this->available; available_resources -= this->normal_task_resources; - return available_resources >= resource_request.GetResourceSet(); + return available_resources >= resource_request_adjusted; } - return this->available >= resource_request.GetResourceSet(); + return this->available >= resource_request_adjusted; } bool NodeResources::IsFeasible(const ResourceRequest &resource_request) const { - return this->total >= resource_request.GetResourceSet(); + const ResourceSet resource_request_adjusted = this->ProcessRelativeResource(resource_request.GetResourceSet()); + return this->total >= resource_request_adjusted; } bool NodeResources::operator==(const NodeResources &other) const { @@ -126,6 +127,25 @@ std::string NodeResources::DebugString() const { return buffer.str(); } +const ResourceSet NodeResources::ProcessRelativeResource(const ResourceSet &resource) const { + ResourceSet adjusted_resource = resource; + // adjust num_gpus <=> gpu_memory conversion + if(resource.Has(ResourceID::GPU())){ + double gpu_mem_request = resource.Get(ResourceID::GPU()).Double() * + (this->total.Get(ResourceID::GPU_Memory()).Double() / this->total.Get(ResourceID::GPU()).Double()); + adjusted_resource.Set(ResourceID::GPU_Memory(), gpu_mem_request); + } else if (resource.Has(ResourceID::GPU_Memory())){ + double total_gpu_mem = this->total.Get(ResourceID::GPU_Memory()).Double(); + double num_gpus_request = 0; + if(total_gpu_mem > 0){ + num_gpus_request = resource.Get(ResourceID::GPU_Memory()).Double() * + (this->total.Get(ResourceID::GPU()).Double() / total_gpu_mem); + } + adjusted_resource.Set(ResourceID::GPU(), num_gpus_request); + } + return adjusted_resource; +} + std::string NodeResources::DictString() const { return DebugString(); } bool NodeResourceInstances::operator==(const NodeResourceInstances &other) { @@ -153,4 +173,23 @@ const NodeResourceInstanceSet &NodeResourceInstances::GetTotalResourceInstances( return this->total; }; +const ResourceSet NodeResourceInstances::ProcessRelativeResource(const ResourceSet &resource) const { + ResourceSet adjusted_resource = resource; + // adjust num_gpus <=> gpu_memory conversion + if(resource.Has(ResourceID::GPU())){ + double gpu_mem_request = resource.Get(ResourceID::GPU()).Double() * + (this->total.Get(ResourceID::GPU_Memory())[0].Double() / this->total.Get(ResourceID::GPU())[0].Double()); + adjusted_resource.Set(ResourceID::GPU_Memory(), gpu_mem_request); + } else if (resource.Has(ResourceID::GPU_Memory())){ + double total_gpu_mem = this->total.Get(ResourceID::GPU_Memory())[0].Double(); + double num_gpus_request = 0; + if(total_gpu_mem > 0){ + num_gpus_request = resource.Get(ResourceID::GPU_Memory()).Double() * + (this->total.Get(ResourceID::GPU())[0].Double() / total_gpu_mem); + } + adjusted_resource.Set(ResourceID::GPU(), num_gpus_request); + } + return adjusted_resource; +}; + } // namespace ray diff --git a/src/ray/common/scheduling/cluster_resource_data.h b/src/ray/common/scheduling/cluster_resource_data.h index 45312e012c0aa..eb4bf55fbce1a 100644 --- a/src/ray/common/scheduling/cluster_resource_data.h +++ b/src/ray/common/scheduling/cluster_resource_data.h @@ -119,6 +119,21 @@ class ResourceRequest { bool requires_object_store_memory_ = false; }; +// update this and resource instance set, +// two options, both same idea: map derived resource id to vector of resource id + +// option 1: update in taskresource instance, resourceinstance set to override Set, add, etc for derived resource id +// for example Set(DerivedNodeID , abs::flat_hash_map> base_resources) +// how to not expose the base resource?? + +// option 2: map derived resource id to its base resource ids outside Resource instance +// problem: Tryallocate is hard to handle, esp need same index for all 3 resources +// + need to free all when failed allocating + +// note: both no need to store DerivedResourceID => vector, as all stored as base resource id + +// we might need to check if gpu_memory and gpu both exists here?? + /// Represents a resource set that contains the per-instance resource values. /// NOTE, unlike ResourceRequest, zero values won't be automatically removed in this /// class. Because otherwise we will lose the number of instances the set originally had @@ -333,6 +348,9 @@ class NodeResources { std::string DebugString() const; /// Returns compact dict-like string. std::string DictString() const; + // Returns adjusted ResourceSet after converting resource relative to others. + // For example: gpu_memory => num_gpus = gpu_memory / total.gpu_memory. + const ResourceSet ProcessRelativeResource(const ResourceSet &resource) const; }; /// Total and available capacities of each resource instance. @@ -351,6 +369,10 @@ class NodeResourceInstances { bool operator==(const NodeResourceInstances &other); /// Returns human-readable string for these resources. [[nodiscard]] std::string DebugString() const; + + // Returns adjusted ResourceSet after converting resource relative to others. + // For example: gpu_memory => num_gpus = gpu_memory / total.gpu_memory. + const ResourceSet ProcessRelativeResource(const ResourceSet &resource) const; }; struct Node { diff --git a/src/ray/common/scheduling/resource_instance_set.cc b/src/ray/common/scheduling/resource_instance_set.cc index 976deee39383e..9e1e891747db2 100644 --- a/src/ray/common/scheduling/resource_instance_set.cc +++ b/src/ray/common/scheduling/resource_instance_set.cc @@ -22,7 +22,8 @@ namespace ray { NodeResourceInstanceSet::NodeResourceInstanceSet(const NodeResourceSet &total) { - for (auto &resource_id : total.ExplicitResourceIds()) { + auto resource_ids = total.ExplicitResourceIds(); + for (auto &resource_id : resource_ids) { std::vector instances; auto value = total.Get(resource_id); if (resource_id.IsUnitInstanceResource()) { @@ -30,6 +31,13 @@ NodeResourceInstanceSet::NodeResourceInstanceSet(const NodeResourceSet &total) { for (size_t i = 0; i < num_instances; i++) { instances.push_back(1.0); }; + } else if(resource_id == ResourceID::GPU_Memory() && + resource_ids.find(ResourceID::GPU()) != resource_ids.end()){ + double num_gpus = total.Get(ResourceID::GPU()).Double(); + for (size_t i = 0; i < static_cast(num_gpus); i++) { + instances.push_back(value.Double() / num_gpus); + }; + } else { instances.push_back(value); } diff --git a/src/ray/common/scheduling/scheduling_ids.h b/src/ray/common/scheduling/scheduling_ids.h index 42799064c9cbf..7bdc81c0e1739 100644 --- a/src/ray/common/scheduling/scheduling_ids.h +++ b/src/ray/common/scheduling/scheduling_ids.h @@ -36,6 +36,7 @@ enum PredefinedResourcesEnum { CPU, MEM, GPU, + GPU_MEM, OBJECT_STORE_MEM, PredefinedResourcesEnum_MAX }; @@ -44,6 +45,7 @@ const std::string kCPU_ResourceLabel = "CPU"; const std::string kGPU_ResourceLabel = "GPU"; const std::string kObjectStoreMemory_ResourceLabel = "object_store_memory"; const std::string kMemory_ResourceLabel = "memory"; +const std::string kGPU_Memory_ResourceLabel = "gpu_memory"; const std::string kBundle_ResourceLabel = "bundle"; /// Class to map string IDs to unique integer IDs and back. @@ -149,7 +151,8 @@ inline StringIdMap &BaseSchedulingID::GetMap() { map->InsertOrDie(kCPU_ResourceLabel, CPU) .InsertOrDie(kGPU_ResourceLabel, GPU) .InsertOrDie(kObjectStoreMemory_ResourceLabel, OBJECT_STORE_MEM) - .InsertOrDie(kMemory_ResourceLabel, MEM); + .InsertOrDie(kMemory_ResourceLabel, MEM) + .InsertOrDie(kGPU_Memory_ResourceLabel, GPU_MEM); return map; }()}; return *map; @@ -184,6 +187,9 @@ class ResourceID : public BaseSchedulingID { /// Resource ID of GPU. static ResourceID GPU() { return ResourceID(PredefinedResourcesEnum::GPU); } + /// Resource ID of GPU memory. + static ResourceID GPU_Memory() { return ResourceID(PredefinedResourcesEnum::GPU_MEM); } + /// Resource ID of object store memory. static ResourceID ObjectStoreMemory() { return ResourceID(PredefinedResourcesEnum::OBJECT_STORE_MEM); diff --git a/src/ray/raylet/scheduling/local_resource_manager.cc b/src/ray/raylet/scheduling/local_resource_manager.cc index 25eb0eef0c658..4695f9c2c8875 100644 --- a/src/ray/raylet/scheduling/local_resource_manager.cc +++ b/src/ray/raylet/scheduling/local_resource_manager.cc @@ -80,11 +80,13 @@ bool LocalResourceManager::AllocateTaskResourceInstances( const ResourceRequest &resource_request, std::shared_ptr task_allocation) { RAY_CHECK(task_allocation != nullptr); + const ResourceSet adjusted_resource_request = local_resources_.ProcessRelativeResource(resource_request.GetResourceSet()); + // add adjust_gpu_memory here, added to NodeInstanceResourceSet auto allocation = - local_resources_.available.TryAllocate(resource_request.GetResourceSet()); + local_resources_.available.TryAllocate(adjusted_resource_request); if (allocation) { *task_allocation = TaskResourceInstances(*allocation); - for (const auto &resource_id : resource_request.ResourceIds()) { + for (const auto &resource_id : adjusted_resource_request.ResourceIds()) { SetResourceNonIdle(resource_id); } return true; From 401b595b026618b222c296e5c730cc4da87b3f0d Mon Sep 17 00:00:00 2001 From: Jonathan Nitisastro Date: Wed, 15 Nov 2023 13:27:18 -0800 Subject: [PATCH 02/14] convert gpu memory to GPU in NodeResources Signed-off-by: Jonathan Nitisastro --- python/ray/_private/accelerators/intel_gpu.py | 2 +- .../ray/_private/accelerators/nvidia_gpu.py | 18 ++++- python/ray/_private/node.py | 2 - python/ray/_private/parameter.py | 8 +-- python/ray/_private/resource_spec.py | 11 ++- python/ray/scripts/scripts.py | 10 +++ .../scheduling/cluster_resource_data.cc | 71 ++++++++++++------- .../common/scheduling/cluster_resource_data.h | 16 +++-- .../scheduling/resource_instance_set.cc | 4 +- .../scheduling/local_resource_manager.cc | 6 +- 10 files changed, 99 insertions(+), 49 deletions(-) diff --git a/python/ray/_private/accelerators/intel_gpu.py b/python/ray/_private/accelerators/intel_gpu.py index 9bf6042f7f758..50968c8e5de28 100644 --- a/python/ray/_private/accelerators/intel_gpu.py +++ b/python/ray/_private/accelerators/intel_gpu.py @@ -101,7 +101,7 @@ def set_current_process_visible_accelerator_ids( os.environ[ IntelGPUAcceleratorManager.get_visible_accelerator_ids_env_var() ] = prefix + ",".join([str(i) for i in visible_xpu_devices]) - + @staticmethod def get_current_node_gpu_memory() -> int: return 0 diff --git a/python/ray/_private/accelerators/nvidia_gpu.py b/python/ray/_private/accelerators/nvidia_gpu.py index e4ed0aec6a7ce..d871aad8081e4 100644 --- a/python/ray/_private/accelerators/nvidia_gpu.py +++ b/python/ray/_private/accelerators/nvidia_gpu.py @@ -122,7 +122,21 @@ def get_ec2_instance_accelerator_type( assert len(gpus) == 1 return gpus[0]["Name"] return None - + @staticmethod def get_current_node_gpu_memory() -> int: - return 1000 + try: + pynvml.nvmlInit() + except pynvml.NVMLError: + return None # pynvml init failed + device_count = pynvml.nvmlDeviceGetCount() + cuda_device_type = None + if device_count > 0: + handle = pynvml.nvmlDeviceGetHandleByIndex(0) + cuda_device_type = ( + NvidiaGPUAcceleratorManager._gpu_name_to_accelerator_type( + pynvml.nvmlDeviceGetName(handle) + ) + ) + pynvml.nvmlShutdown() + return cuda_device_type diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 20ed31e322c92..02b08b0c43cb9 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -539,8 +539,6 @@ def merge_resources(env_dict, params_dict): object_store_memory, resources, ) = merge_resources(env_resources, self._ray_params.resources) - if num_gpus and gpu_memory and num_gpus != len(gpu_memory): - raise ValueError(f"Number of gpus specified: {gpu_memory} does not match specified num_gpus: {num_gpus}") self._resource_spec = ResourceSpec( self._ray_params.num_cpus if num_cpus is None else num_cpus, self._ray_params.num_gpus if num_gpus is None else num_gpus, diff --git a/python/ray/_private/parameter.py b/python/ray/_private/parameter.py index 9c354176a2e97..595344878a0e5 100644 --- a/python/ray/_private/parameter.py +++ b/python/ray/_private/parameter.py @@ -33,7 +33,7 @@ class RayParams: of that resource available. labels: The key-value labels of the node. memory: Total available memory for workers requesting memory. - gpu_memory: Total available memory for each gpu from num_gpus. + _gpu_memory: Total available memory from all GPUs in MB. object_store_memory: The amount of memory (in bytes) to start the object store with. redis_max_memory: The max amount of memory (in bytes) to allow redis @@ -142,7 +142,7 @@ def __init__( resources: Optional[Dict[str, float]] = None, labels: Optional[Dict[str, str]] = None, memory: Optional[float] = None, - gpu_memory: Optional[List[float]] = None, + _gpu_memory: Optional[float] = None, object_store_memory: Optional[float] = None, redis_max_memory: Optional[float] = None, redis_port: Optional[int] = None, @@ -200,7 +200,7 @@ def __init__( self.num_cpus = num_cpus self.num_gpus = num_gpus self.memory = memory - self.gpu_memory = gpu_memory + self._gpu_memory = _gpu_memory self.object_store_memory = object_store_memory self.resources = resources self.redis_max_memory = redis_max_memory @@ -443,7 +443,7 @@ def build_error(resource, alternative): assert "GPU" not in self.resources, build_error("GPU", "num_gpus") assert "memory" not in self.resources, build_error("memory", "memory") assert "gpu_memory" not in self.resources, build_error( - "gpu_memory", "gpu_memory" + "gpu_memory", "_gpu_memory" ) assert "object_store_memory" not in self.resources, build_error( "object_store_memory", "object_store_memory" diff --git a/python/ray/_private/resource_spec.py b/python/ray/_private/resource_spec.py index b96747e702519..b8ffcaa11ba80 100644 --- a/python/ray/_private/resource_spec.py +++ b/python/ray/_private/resource_spec.py @@ -40,7 +40,7 @@ class ResourceSpec( num_cpus: The CPUs allocated for this raylet. num_gpus: The GPUs allocated for this raylet. memory: The memory allocated for this raylet. - gpu_memory: The GPUs' memory allocated for this raylet. + gpu_memory: The total GPUs' memory allocated for this raylet. object_store_memory: The object store memory allocated for this raylet. Note that when calling to_resource_dict(), this will be scaled down by 30% to account for the global plasma LRU reserve. @@ -215,7 +215,14 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None): if num_accelerators: if accelerator_resource_name == "GPU": num_gpus = num_accelerators - gpu_memory = num_accelerators * (self.gpu_memory if self.gpu_memory else accelerator_manager.get_current_node_gpu_memory()) + gpu_memory = ( + self.gpu_memory + if self.gpu_memory + else ( + num_accelerators + * accelerator_manager.get_current_node_gpu_memory() + ) + ) resources["gpu_memory"] = gpu_memory else: resources[accelerator_resource_name] = num_accelerators diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 3af977374d41f..74450965c29ef 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -381,6 +381,14 @@ def debug(address): @click.option( "--num-gpus", required=False, type=int, help="the number of GPUs on this node" ) +@click.option( + "--gpu-memory", + required=False, + type=int, + help="The total amount of memory (in megabytes) to make available to workers. " + "By default, this is set to the sum of available memory " + "from the gpus detected gpus in the node.", +) @click.option( "--resources", required=False, @@ -564,6 +572,7 @@ def start( redis_max_memory, num_cpus, num_gpus, + gpu_memory, resources, head, include_dashboard, @@ -656,6 +665,7 @@ def start( redirect_output=redirect_output, num_cpus=num_cpus, num_gpus=num_gpus, + _gpu_memory=gpu_memory, resources=resources, labels=labels_dict, autoscaling_config=autoscaling_config, diff --git a/src/ray/common/scheduling/cluster_resource_data.cc b/src/ray/common/scheduling/cluster_resource_data.cc index d938b1e41c6b1..31cf3d73d6f92 100644 --- a/src/ray/common/scheduling/cluster_resource_data.cc +++ b/src/ray/common/scheduling/cluster_resource_data.cc @@ -54,9 +54,20 @@ NodeResources ResourceMapToNodeResources( const absl::flat_hash_map &resource_map_available, const absl::flat_hash_map &node_labels) { NodeResources node_resources; + // move gpu_memory to node labels + if (resource_map_total.find("gpu_memory") != resource_map_total.end()) { + node_labels["gpu_memory"] = + resource_map_total["gpu_memory"] / resource_map_total["GPU"]; + resource_map_total.erase("gpu_memory"); + resource_map_available.erase("gpu_memory"); + } else { + node_labels["gpu_memory"] = "0"; + } + node_resources.total = NodeResourceSet(resource_map_total); node_resources.available = NodeResourceSet(resource_map_available); node_resources.labels = node_labels; + return node_resources; } @@ -92,7 +103,8 @@ bool NodeResources::IsAvailable(const ResourceRequest &resource_request, RAY_LOG(DEBUG) << "At pull manager capacity"; return false; } - const ResourceSet resource_request_adjusted = this->ProcessRelativeResource(resource_request.GetResourceSet()); + const ResourceSet resource_request_adjusted = + this->ConvertRelativeResource(resource_request.GetResourceSet()); if (!this->normal_task_resources.IsEmpty()) { auto available_resources = this->available; available_resources -= this->normal_task_resources; @@ -102,7 +114,8 @@ bool NodeResources::IsAvailable(const ResourceRequest &resource_request, } bool NodeResources::IsFeasible(const ResourceRequest &resource_request) const { - const ResourceSet resource_request_adjusted = this->ProcessRelativeResource(resource_request.GetResourceSet()); + const ResourceSet resource_request_adjusted = + this->ConvertRelativeResource(resource_request.GetResourceSet()); return this->total >= resource_request_adjusted; } @@ -127,21 +140,24 @@ std::string NodeResources::DebugString() const { return buffer.str(); } -const ResourceSet NodeResources::ProcessRelativeResource(const ResourceSet &resource) const { - ResourceSet adjusted_resource = resource; - // adjust num_gpus <=> gpu_memory conversion - if(resource.Has(ResourceID::GPU())){ - double gpu_mem_request = resource.Get(ResourceID::GPU()).Double() * - (this->total.Get(ResourceID::GPU_Memory()).Double() / this->total.Get(ResourceID::GPU()).Double()); - adjusted_resource.Set(ResourceID::GPU_Memory(), gpu_mem_request); - } else if (resource.Has(ResourceID::GPU_Memory())){ - double total_gpu_mem = this->total.Get(ResourceID::GPU_Memory()).Double(); +const ResourceSet NodeResources::ConvertRelativeResource( + const ResourceSet &resource) const { + ResourceSet adjusted_resource = resource; + // convert gpu_memory to GPU + if (resource.Has(ResourceID::GPU_Memory())) { + double total_gpu_memory = 0; + if (this->labels.find("GPU_Memory") != this->labels.end()) { + total_gpu_mem = std::stod(this->labels["GPU_Memory"]); + } double num_gpus_request = 0; - if(total_gpu_mem > 0){ - num_gpus_request = resource.Get(ResourceID::GPU_Memory()).Double() * - (this->total.Get(ResourceID::GPU()).Double() / total_gpu_mem); + if (total_gpu_mem > 0) { + // round up to closes kResourceUnitScaling + num_gpus_request = + (resource.Get(ResourceID::GPU_Memory()).Double() / total_gpu_mem) + + 1 / static_cast(2 * kResourceUnitScaling); } adjusted_resource.Set(ResourceID::GPU(), num_gpus_request); + adjusted_resource.Set(ResourceID::GPU_Memory(), 0); } return adjusted_resource; } @@ -173,21 +189,24 @@ const NodeResourceInstanceSet &NodeResourceInstances::GetTotalResourceInstances( return this->total; }; -const ResourceSet NodeResourceInstances::ProcessRelativeResource(const ResourceSet &resource) const { - ResourceSet adjusted_resource = resource; - // adjust num_gpus <=> gpu_memory conversion - if(resource.Has(ResourceID::GPU())){ - double gpu_mem_request = resource.Get(ResourceID::GPU()).Double() * - (this->total.Get(ResourceID::GPU_Memory())[0].Double() / this->total.Get(ResourceID::GPU())[0].Double()); - adjusted_resource.Set(ResourceID::GPU_Memory(), gpu_mem_request); - } else if (resource.Has(ResourceID::GPU_Memory())){ - double total_gpu_mem = this->total.Get(ResourceID::GPU_Memory())[0].Double(); +const ResourceSet NodeResourceInstances::ConvertRelativeResource( + const ResourceSet &resource) const { + ResourceSet adjusted_resource = resource; + // convert gpu_memory to GPU + if (resource.Has(ResourceID::GPU_Memory())) { + double total_gpu_memory = 0; + if (this->labels.find("GPU_Memory") != this->labels.end()) { + total_gpu_mem = std::stod(this->labels["GPU_Memory"]); + } double num_gpus_request = 0; - if(total_gpu_mem > 0){ - num_gpus_request = resource.Get(ResourceID::GPU_Memory()).Double() * - (this->total.Get(ResourceID::GPU())[0].Double() / total_gpu_mem); + if (total_gpu_mem > 0) { + // round up to closes kResourceUnitScaling + num_gpus_request = + (resource.Get(ResourceID::GPU_Memory()).Double() / total_gpu_mem) + + 1 / static_cast(2 * kResourceUnitScaling); } adjusted_resource.Set(ResourceID::GPU(), num_gpus_request); + adjusted_resource.Set(ResourceID::GPU_Memory(), 0); } return adjusted_resource; }; diff --git a/src/ray/common/scheduling/cluster_resource_data.h b/src/ray/common/scheduling/cluster_resource_data.h index eb4bf55fbce1a..aa92716948c6b 100644 --- a/src/ray/common/scheduling/cluster_resource_data.h +++ b/src/ray/common/scheduling/cluster_resource_data.h @@ -119,18 +119,20 @@ class ResourceRequest { bool requires_object_store_memory_ = false; }; -// update this and resource instance set, +// update this and resource instance set, // two options, both same idea: map derived resource id to vector of resource id -// option 1: update in taskresource instance, resourceinstance set to override Set, add, etc for derived resource id -// for example Set(DerivedNodeID , abs::flat_hash_map> base_resources) -// how to not expose the base resource?? +// option 1: update in taskresource instance, resourceinstance set to override Set, add, +// etc for derived resource id for example Set(DerivedNodeID , +// abs::flat_hash_map> base_resources) how to not expose +// the base resource?? // option 2: map derived resource id to its base resource ids outside Resource instance // problem: Tryallocate is hard to handle, esp need same index for all 3 resources // + need to free all when failed allocating -// note: both no need to store DerivedResourceID => vector, as all stored as base resource id +// note: both no need to store DerivedResourceID => vector, as all stored as +// base resource id // we might need to check if gpu_memory and gpu both exists here?? @@ -350,7 +352,7 @@ class NodeResources { std::string DictString() const; // Returns adjusted ResourceSet after converting resource relative to others. // For example: gpu_memory => num_gpus = gpu_memory / total.gpu_memory. - const ResourceSet ProcessRelativeResource(const ResourceSet &resource) const; + const ResourceSet ConvertRelativeResource(const ResourceSet &resource) const; }; /// Total and available capacities of each resource instance. @@ -372,7 +374,7 @@ class NodeResourceInstances { // Returns adjusted ResourceSet after converting resource relative to others. // For example: gpu_memory => num_gpus = gpu_memory / total.gpu_memory. - const ResourceSet ProcessRelativeResource(const ResourceSet &resource) const; + const ResourceSet ConvertRelativeResource(const ResourceSet &resource) const; }; struct Node { diff --git a/src/ray/common/scheduling/resource_instance_set.cc b/src/ray/common/scheduling/resource_instance_set.cc index 9e1e891747db2..8d30d03c9d3c8 100644 --- a/src/ray/common/scheduling/resource_instance_set.cc +++ b/src/ray/common/scheduling/resource_instance_set.cc @@ -31,8 +31,8 @@ NodeResourceInstanceSet::NodeResourceInstanceSet(const NodeResourceSet &total) { for (size_t i = 0; i < num_instances; i++) { instances.push_back(1.0); }; - } else if(resource_id == ResourceID::GPU_Memory() && - resource_ids.find(ResourceID::GPU()) != resource_ids.end()){ + } else if (resource_id == ResourceID::GPU_Memory() && + resource_ids.find(ResourceID::GPU()) != resource_ids.end()) { double num_gpus = total.Get(ResourceID::GPU()).Double(); for (size_t i = 0; i < static_cast(num_gpus); i++) { instances.push_back(value.Double() / num_gpus); diff --git a/src/ray/raylet/scheduling/local_resource_manager.cc b/src/ray/raylet/scheduling/local_resource_manager.cc index 4695f9c2c8875..b144d263d93f8 100644 --- a/src/ray/raylet/scheduling/local_resource_manager.cc +++ b/src/ray/raylet/scheduling/local_resource_manager.cc @@ -80,10 +80,10 @@ bool LocalResourceManager::AllocateTaskResourceInstances( const ResourceRequest &resource_request, std::shared_ptr task_allocation) { RAY_CHECK(task_allocation != nullptr); - const ResourceSet adjusted_resource_request = local_resources_.ProcessRelativeResource(resource_request.GetResourceSet()); + const ResourceSet adjusted_resource_request = + local_resources_.ConvertRelativeResource(resource_request.GetResourceSet()); // add adjust_gpu_memory here, added to NodeInstanceResourceSet - auto allocation = - local_resources_.available.TryAllocate(adjusted_resource_request); + auto allocation = local_resources_.available.TryAllocate(adjusted_resource_request); if (allocation) { *task_allocation = TaskResourceInstances(*allocation); for (const auto &resource_id : adjusted_resource_request.ResourceIds()) { From e26a0c67d5a7e3d2d18c343f18d357740f1406fb Mon Sep 17 00:00:00 2001 From: Jonathan Nitisastro Date: Wed, 15 Nov 2023 13:46:54 -0800 Subject: [PATCH 03/14] expose gpu_memory in remote function Signed-off-by: Jonathan Nitisastro --- .../ray/_private/accelerators/accelerator.py | 10 ++++++ python/ray/_private/accelerators/intel_gpu.py | 2 +- .../ray/_private/accelerators/nvidia_gpu.py | 4 +-- python/ray/_private/node.py | 2 +- python/ray/_private/resource_spec.py | 2 +- python/ray/_private/utils.py | 13 +++++-- python/ray/remote_function.py | 11 ++++++ .../scheduling/cluster_resource_data.cc | 35 ++++++++++--------- 8 files changed, 55 insertions(+), 24 deletions(-) diff --git a/python/ray/_private/accelerators/accelerator.py b/python/ray/_private/accelerators/accelerator.py index 70178094e14cd..ae9f30908a851 100644 --- a/python/ray/_private/accelerators/accelerator.py +++ b/python/ray/_private/accelerators/accelerator.py @@ -136,3 +136,13 @@ def get_ec2_instance_accelerator_type( Return None if it's unknown. """ return None + + @staticmethod + def get_current_node_accelerator_memory() -> int: + """Get the total number of accelerators of this family on the current node. + + Returns: + The detected total number of accelerators of this family. + Return 0 if the current node doesn't contain accelerators of this family. + """ + return 0 diff --git a/python/ray/_private/accelerators/intel_gpu.py b/python/ray/_private/accelerators/intel_gpu.py index 50968c8e5de28..c281c06a9ec07 100644 --- a/python/ray/_private/accelerators/intel_gpu.py +++ b/python/ray/_private/accelerators/intel_gpu.py @@ -103,5 +103,5 @@ def set_current_process_visible_accelerator_ids( ] = prefix + ",".join([str(i) for i in visible_xpu_devices]) @staticmethod - def get_current_node_gpu_memory() -> int: + def get_current_node_accelerator_memory() -> int: return 0 diff --git a/python/ray/_private/accelerators/nvidia_gpu.py b/python/ray/_private/accelerators/nvidia_gpu.py index d871aad8081e4..82b429f7abff9 100644 --- a/python/ray/_private/accelerators/nvidia_gpu.py +++ b/python/ray/_private/accelerators/nvidia_gpu.py @@ -124,11 +124,11 @@ def get_ec2_instance_accelerator_type( return None @staticmethod - def get_current_node_gpu_memory() -> int: + def get_current_node_accelerator_memory() -> int: try: pynvml.nvmlInit() except pynvml.NVMLError: - return None # pynvml init failed + return 0 # pynvml init failed device_count = pynvml.nvmlDeviceGetCount() cuda_device_type = None if device_count > 0: diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 02b08b0c43cb9..8141636476f85 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -543,7 +543,7 @@ def merge_resources(env_dict, params_dict): self._ray_params.num_cpus if num_cpus is None else num_cpus, self._ray_params.num_gpus if num_gpus is None else num_gpus, self._ray_params.memory if memory is None else memory, - self._ray_params.gpu_memory if gpu_memory is None else gpu_memory, + self._ray_params._gpu_memory if gpu_memory is None else gpu_memory, self._ray_params.object_store_memory if object_store_memory is None else object_store_memory, diff --git a/python/ray/_private/resource_spec.py b/python/ray/_private/resource_spec.py index b8ffcaa11ba80..189075259f002 100644 --- a/python/ray/_private/resource_spec.py +++ b/python/ray/_private/resource_spec.py @@ -220,7 +220,7 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None): if self.gpu_memory else ( num_accelerators - * accelerator_manager.get_current_node_gpu_memory() + * accelerator_manager.get_current_node_accelerator_memory() ) ) resources["gpu_memory"] = gpu_memory diff --git a/python/ray/_private/utils.py b/python/ray/_private/utils.py index 72e205be84ec9..41551d79dd0b7 100644 --- a/python/ray/_private/utils.py +++ b/python/ray/_private/utils.py @@ -361,15 +361,20 @@ def resources_from_ray_options(options_dict: Dict[str, Any]) -> Dict[str, Any]: raise ValueError( "The resources dictionary must not contain the key 'CPU' or 'GPU'" ) - elif "memory" in resources or "object_store_memory" in resources: + elif ( + "memory" in resources + or "object_store_memory" in resources + or "_gpu_memory" in resources + ): raise ValueError( - "The resources dictionary must not " - "contain the key 'memory' or 'object_store_memory'" + "The resources dictionary must not contain the key" + " 'memory' or 'object_store_memory' or '_gpu_memory'" ) num_cpus = options_dict.get("num_cpus") num_gpus = options_dict.get("num_gpus") memory = options_dict.get("memory") + gpu_memory = options_dict.get("_gpu_memory") object_store_memory = options_dict.get("object_store_memory") accelerator_type = options_dict.get("accelerator_type") @@ -381,6 +386,8 @@ def resources_from_ray_options(options_dict: Dict[str, Any]) -> Dict[str, Any]: resources["memory"] = int(memory) if object_store_memory is not None: resources["object_store_memory"] = object_store_memory + if gpu_memory is not None: + resources["gpu_memory"] = int(gpu_memory) if accelerator_type is not None: resources[ f"{ray_constants.RESOURCE_CONSTRAINT_PREFIX}{accelerator_type}" diff --git a/python/ray/remote_function.py b/python/ray/remote_function.py index 77212413ad150..93fe859433141 100644 --- a/python/ray/remote_function.py +++ b/python/ray/remote_function.py @@ -56,6 +56,8 @@ class RemoteFunction: remote function. _memory: The heap memory request in bytes for this task/actor, rounded down to the nearest integer. + _gpu_memory: The gpu memory request in megabytes for this task/actor + from a single gpu, rounded down to the nearest integer. _resources: The default custom resource requirements for invocations of this remote function. _num_returns: The default number of return values for invocations @@ -110,6 +112,13 @@ def __init__( ) or "nsight" in (self._default_options.get("runtime_env") or {}): self._default_options["max_calls"] = 1 + # Either num_gpus or gpu_memory can be specified, not both. + if num_gpus != 0 and self._default_options.get("_gpu_memory"): + raise ValueError( + "Specifying both `num_gpus` and `_gpu_memory` is not allowed. " + "See more at: (link TBD)" + ) + # TODO(suquark): This is a workaround for class attributes of options. # They are being used in some other places, mostly tests. Need cleanup later. # E.g., actors uses "__ray_metadata__" to collect options, we can so something @@ -177,6 +186,8 @@ def options(self, **task_options): See :ref:`accelerator types `. memory: The heap memory request in bytes for this task/actor, rounded down to the nearest integer. + _gpu_memory: The gpu memory request in megabytes for this task/actor + from a single gpu, rounded down to the nearest integer. object_store_memory: The object store memory request for actors only. max_calls: This specifies the maximum number of times that a given worker can execute diff --git a/src/ray/common/scheduling/cluster_resource_data.cc b/src/ray/common/scheduling/cluster_resource_data.cc index 31cf3d73d6f92..cb87b549ee5e7 100644 --- a/src/ray/common/scheduling/cluster_resource_data.cc +++ b/src/ray/common/scheduling/cluster_resource_data.cc @@ -54,19 +54,22 @@ NodeResources ResourceMapToNodeResources( const absl::flat_hash_map &resource_map_available, const absl::flat_hash_map &node_labels) { NodeResources node_resources; + auto resource_map_total_copy = resource_map_total; + auto resource_map_available_copy = resource_map_available; + auto node_labels_copy = node_labels; // move gpu_memory to node labels if (resource_map_total.find("gpu_memory") != resource_map_total.end()) { - node_labels["gpu_memory"] = - resource_map_total["gpu_memory"] / resource_map_total["GPU"]; - resource_map_total.erase("gpu_memory"); - resource_map_available.erase("gpu_memory"); + node_labels_copy["gpu_memory"] = + resource_map_total.at("gpu_memory") / resource_map_total.at("GPU"); + resource_map_total_copy.erase("gpu_memory"); + resource_map_available_copy.erase("gpu_memory"); } else { - node_labels["gpu_memory"] = "0"; + node_labels_copy["gpu_memory"] = "0"; } - node_resources.total = NodeResourceSet(resource_map_total); - node_resources.available = NodeResourceSet(resource_map_available); - node_resources.labels = node_labels; + node_resources.total = NodeResourceSet(resource_map_total_copy); + node_resources.available = NodeResourceSet(resource_map_available_copy); + node_resources.labels = node_labels_copy; return node_resources; } @@ -146,14 +149,14 @@ const ResourceSet NodeResources::ConvertRelativeResource( // convert gpu_memory to GPU if (resource.Has(ResourceID::GPU_Memory())) { double total_gpu_memory = 0; - if (this->labels.find("GPU_Memory") != this->labels.end()) { - total_gpu_mem = std::stod(this->labels["GPU_Memory"]); + if (this->labels.find("gpu_memory") != this->labels.end()) { + total_gpu_memory = std::stod(this->labels.at("gpu_memory")); } double num_gpus_request = 0; - if (total_gpu_mem > 0) { + if (total_gpu_memory > 0) { // round up to closes kResourceUnitScaling num_gpus_request = - (resource.Get(ResourceID::GPU_Memory()).Double() / total_gpu_mem) + + (resource.Get(ResourceID::GPU_Memory()).Double() / total_gpu_memory) + 1 / static_cast(2 * kResourceUnitScaling); } adjusted_resource.Set(ResourceID::GPU(), num_gpus_request); @@ -195,14 +198,14 @@ const ResourceSet NodeResourceInstances::ConvertRelativeResource( // convert gpu_memory to GPU if (resource.Has(ResourceID::GPU_Memory())) { double total_gpu_memory = 0; - if (this->labels.find("GPU_Memory") != this->labels.end()) { - total_gpu_mem = std::stod(this->labels["GPU_Memory"]); + if (this->labels.find("gpu_memory") != this->labels.end()) { + total_gpu_memory = std::stod(this->labels.at("gpu_memory")); } double num_gpus_request = 0; - if (total_gpu_mem > 0) { + if (total_gpu_memory > 0) { // round up to closes kResourceUnitScaling num_gpus_request = - (resource.Get(ResourceID::GPU_Memory()).Double() / total_gpu_mem) + + (resource.Get(ResourceID::GPU_Memory()).Double() / total_gpu_memory) + 1 / static_cast(2 * kResourceUnitScaling); } adjusted_resource.Set(ResourceID::GPU(), num_gpus_request); From 1b705919300d36ec4c2d9b361fbf3aaa11860593 Mon Sep 17 00:00:00 2001 From: Jonathan Nitisastro Date: Wed, 15 Nov 2023 16:38:24 -0800 Subject: [PATCH 04/14] gpu_memory on worker and actor Signed-off-by: Jonathan Nitisastro --- python/ray/_private/ray_option_utils.py | 1 + python/ray/_private/resource_spec.py | 1 - python/ray/_private/worker.py | 12 ++++++++++++ python/ray/actor.py | 13 +++++++++++++ src/ray/common/scheduling/cluster_resource_data.cc | 4 +++- src/ray/raylet/local_task_manager.cc | 4 ++-- src/ray/raylet/scheduling/local_resource_manager.cc | 3 +++ 7 files changed, 34 insertions(+), 4 deletions(-) diff --git a/python/ray/_private/ray_option_utils.py b/python/ray/_private/ray_option_utils.py index 56d998ace406d..e5b0a7672afdc 100644 --- a/python/ray/_private/ray_option_utils.py +++ b/python/ray/_private/ray_option_utils.py @@ -125,6 +125,7 @@ def _validate_resources(resources: Optional[Dict[str, float]]) -> Optional[str]: "name": Option((str, type(None))), "num_cpus": _resource_option("num_cpus"), "num_gpus": _resource_option("num_gpus"), + "_gpu_memory": _resource_option("_gpu_memory"), "object_store_memory": _counting_option("object_store_memory", False), # TODO(suquark): "placement_group", "placement_group_bundle_index" # and "placement_group_capture_child_tasks" are deprecated, diff --git a/python/ray/_private/resource_spec.py b/python/ray/_private/resource_spec.py index 189075259f002..9d8457fa5b916 100644 --- a/python/ray/_private/resource_spec.py +++ b/python/ray/_private/resource_spec.py @@ -223,7 +223,6 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None): * accelerator_manager.get_current_node_accelerator_memory() ) ) - resources["gpu_memory"] = gpu_memory else: resources[accelerator_resource_name] = num_accelerators diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index f2dff8ba6351e..aa420b7bfb9f1 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -1157,6 +1157,7 @@ def init( resources: Optional[Dict[str, float]] = None, labels: Optional[Dict[str, str]] = None, object_store_memory: Optional[int] = None, + _gpu_memory: Optional[int] = None, local_mode: bool = False, ignore_reinit_error: bool = False, include_dashboard: Optional[bool] = None, @@ -1287,6 +1288,8 @@ def init( _driver_object_store_memory: Deprecated. _memory: Amount of reservable memory resource in bytes rounded down to the nearest integer. + _gpu_memory: The gpu memory request in megabytes for this task/actor + from a single gpu, rounded down to the nearest integer. _redis_password: Prevents external clients without the password from connecting to Redis if provided. _temp_dir: If provided, specifies the root temporary @@ -1562,6 +1565,7 @@ def sigterm_handler(signum, frame): dashboard_host=dashboard_host, dashboard_port=dashboard_port, memory=_memory, + _gpu_memory=_gpu_memory, object_store_memory=object_store_memory, redis_max_memory=_redis_max_memory, plasma_store_socket_name=None, @@ -1590,6 +1594,11 @@ def sigterm_handler(signum, frame): "When connecting to an existing cluster, num_cpus " "and num_gpus must not be provided." ) + if _gpu_memory is not None: + raise ValueError( + "When connecting to an existing cluster, " + "_gpu_memory must not be provided." + ) if resources is not None: raise ValueError( "When connecting to an existing cluster, " @@ -3127,6 +3136,7 @@ def remote( resources: Dict[str, float] = Undefined, accelerator_type: str = Undefined, memory: Union[int, float] = Undefined, + _gpu_memory: Union[int, float] = Undefined, max_calls: int = Undefined, max_restarts: int = Undefined, max_task_retries: int = Undefined, @@ -3296,6 +3306,8 @@ def method(self): See :ref:`accelerator types `. memory: The heap memory request in bytes for this task/actor, rounded down to the nearest integer. + _gpu_memory: The gpu memory request in megabytes for this task/actor + from a single gpu, rounded down to the nearest integer. max_calls: Only for *remote functions*. This specifies the maximum number of times that a given worker can execute the given remote function before it must exit diff --git a/python/ray/actor.py b/python/ray/actor.py index a6adfd5e862a7..0eed079d94150 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -350,6 +350,8 @@ class _ActorClassMetadata: num_gpus: The default number of GPUs required by the actor creation task. memory: The heap memory quota for this actor. + _gpu_memory: The gpu memory request in megabytes for this task/actor + from a single gpu, rounded down to the nearest integer. resources: The default resources required by the actor creation task. accelerator_type: The specified type of accelerator required for the node on which this actor runs. @@ -376,6 +378,7 @@ def __init__( num_cpus, num_gpus, memory, + _gpu_memory, object_store_memory, resources, accelerator_type, @@ -394,6 +397,7 @@ def __init__( self.num_cpus = num_cpus self.num_gpus = num_gpus self.memory = memory + self._gpu_memory = _gpu_memory self.object_store_memory = object_store_memory self.resources = resources self.accelerator_type = accelerator_type @@ -595,6 +599,8 @@ def options(self, **actor_options): See :ref:`accelerator types `. memory: The heap memory request in bytes for this task/actor, rounded down to the nearest integer. + _gpu_memory: The gpu memory request in megabytes for this task/actor + from a single gpu, rounded down to the nearest integer. object_store_memory: The object store memory request for actors only. max_restarts: This specifies the maximum number of times that the actor should be restarted when it dies @@ -718,6 +724,7 @@ def _remote(self, args=None, kwargs=None, **actor_options): num_cpus: The number of CPUs required by the actor creation task. num_gpus: The number of GPUs required by the actor creation task. memory: Restrict the heap memory usage of this actor. + _gpu_memory: Restrict the gpu memory usage of this actor. resources: The custom resources required by the actor creation task. max_concurrency: The max number of concurrent calls to allow for @@ -1424,6 +1431,12 @@ def _make_actor(cls, actor_options): Class = _modify_class(cls) _inject_tracing_into_class(Class) + if "_gpu_memory" in actor_options and "num_gpus" in actor_options: + raise ValueError( + "Specifying both `num_gpus` and `_gpu_memory` is not allowed. " + "See more at: (link TBD)" + ) + if "max_restarts" in actor_options: if actor_options["max_restarts"] != -1: # -1 represents infinite restart # Make sure we don't pass too big of an int to C++, causing diff --git a/src/ray/common/scheduling/cluster_resource_data.cc b/src/ray/common/scheduling/cluster_resource_data.cc index cb87b549ee5e7..7048c40867a12 100644 --- a/src/ray/common/scheduling/cluster_resource_data.cc +++ b/src/ray/common/scheduling/cluster_resource_data.cc @@ -59,8 +59,10 @@ NodeResources ResourceMapToNodeResources( auto node_labels_copy = node_labels; // move gpu_memory to node labels if (resource_map_total.find("gpu_memory") != resource_map_total.end()) { + RAY_LOG(INFO) << resource_map_total.at("gpu_memory"); node_labels_copy["gpu_memory"] = - resource_map_total.at("gpu_memory") / resource_map_total.at("GPU"); + std::to_string(resource_map_total.at("gpu_memory") / resource_map_total.at("GPU")); + //RAY_CHECK(std::stod(node_labels_copy.at("gpu_memory")) == 1000); resource_map_total_copy.erase("gpu_memory"); resource_map_available_copy.erase("gpu_memory"); } else { diff --git a/src/ray/raylet/local_task_manager.cc b/src/ray/raylet/local_task_manager.cc index 7be0d7430fb86..c2273f825b336 100644 --- a/src/ray/raylet/local_task_manager.cc +++ b/src/ray/raylet/local_task_manager.cc @@ -430,9 +430,9 @@ bool LocalTaskManager::PoppedWorkerHandler( const auto &required_resource = task.GetTaskSpecification().GetRequiredResources().GetResourceMap(); - for (auto &entry : required_resource) { + for (auto &entry : required_resource) { // it fails here if (!cluster_resource_scheduler_->GetLocalResourceManager().ResourcesExist( - scheduling::ResourceID(entry.first))) { + scheduling::ResourceID(entry.first)) && entry.first != "gpu_memory") { RAY_CHECK(task.GetTaskSpecification().PlacementGroupBundleId().first != PlacementGroupID::Nil()); RAY_LOG(DEBUG) << "The placement group: " diff --git a/src/ray/raylet/scheduling/local_resource_manager.cc b/src/ray/raylet/scheduling/local_resource_manager.cc index b144d263d93f8..512d1a4c76f8e 100644 --- a/src/ray/raylet/scheduling/local_resource_manager.cc +++ b/src/ray/raylet/scheduling/local_resource_manager.cc @@ -82,6 +82,9 @@ bool LocalResourceManager::AllocateTaskResourceInstances( RAY_CHECK(task_allocation != nullptr); const ResourceSet adjusted_resource_request = local_resources_.ConvertRelativeResource(resource_request.GetResourceSet()); + if (resource_request.GetResourceSet().Has(ResourceID::GPU_Memory()) && adjusted_resource_request.Get(ResourceID::GPU()) > 1) { + return false; + } // add adjust_gpu_memory here, added to NodeInstanceResourceSet auto allocation = local_resources_.available.TryAllocate(adjusted_resource_request); if (allocation) { From 89dca92007348708c982cff3cf73a3c401220fda Mon Sep 17 00:00:00 2001 From: Jonathan Nitisastro Date: Wed, 15 Nov 2023 21:31:33 -0800 Subject: [PATCH 05/14] add request only resource for gpu memory Signed-off-by: Jonathan Nitisastro --- src/ray/common/ray_config_def.h | 6 ++++++ .../scheduling/cluster_resource_data.cc | 8 ++++---- src/ray/common/scheduling/scheduling_ids.cc | 19 +++++++++++++++++++ src/ray/common/scheduling/scheduling_ids.h | 5 +++++ src/ray/raylet/local_task_manager.cc | 6 ++++-- .../scheduling/local_resource_manager.cc | 3 ++- 6 files changed, 40 insertions(+), 7 deletions(-) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 969265cc64bb9..9c44a4202102b 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -718,6 +718,12 @@ RAY_CONFIG(std::string, predefined_unit_instance_resources, "GPU") /// When set it to "neuron_cores,TPU,FPGA", we will also treat FPGA as unit_instance. RAY_CONFIG(std::string, custom_unit_instance_resources, "neuron_cores,TPU") +/// The scheduler will treat these resource as resource which can be requested +/// but not stored as NodeResources. The main reason is the resource is different +/// representation of other resource stored in NodeResources. +/// For example: gpu_memory and GPU. +RAY_CONFIG(std::string, request_only_resources, "gpu_memory") + // Maximum size of the batches when broadcasting resources to raylet. RAY_CONFIG(uint64_t, resource_broadcast_batch_size, 512) diff --git a/src/ray/common/scheduling/cluster_resource_data.cc b/src/ray/common/scheduling/cluster_resource_data.cc index 7048c40867a12..74f42b15374b2 100644 --- a/src/ray/common/scheduling/cluster_resource_data.cc +++ b/src/ray/common/scheduling/cluster_resource_data.cc @@ -59,10 +59,9 @@ NodeResources ResourceMapToNodeResources( auto node_labels_copy = node_labels; // move gpu_memory to node labels if (resource_map_total.find("gpu_memory") != resource_map_total.end()) { - RAY_LOG(INFO) << resource_map_total.at("gpu_memory"); - node_labels_copy["gpu_memory"] = - std::to_string(resource_map_total.at("gpu_memory") / resource_map_total.at("GPU")); - //RAY_CHECK(std::stod(node_labels_copy.at("gpu_memory")) == 1000); + node_labels_copy["gpu_memory"] = std::to_string(resource_map_total.at("gpu_memory") / + resource_map_total.at("GPU")); + // RAY_CHECK(std::stod(node_labels_copy.at("gpu_memory")) == 1000); resource_map_total_copy.erase("gpu_memory"); resource_map_available_copy.erase("gpu_memory"); } else { @@ -152,6 +151,7 @@ const ResourceSet NodeResources::ConvertRelativeResource( if (resource.Has(ResourceID::GPU_Memory())) { double total_gpu_memory = 0; if (this->labels.find("gpu_memory") != this->labels.end()) { + // TODO: raise exception if this is not true total_gpu_memory = std::stod(this->labels.at("gpu_memory")); } double num_gpus_request = 0; diff --git a/src/ray/common/scheduling/scheduling_ids.cc b/src/ray/common/scheduling/scheduling_ids.cc index 61faab47b1d27..fcf7092e3a583 100644 --- a/src/ray/common/scheduling/scheduling_ids.cc +++ b/src/ray/common/scheduling/scheduling_ids.cc @@ -113,6 +113,25 @@ absl::flat_hash_set &ResourceID::UnitInstanceResources() { return set; } +absl::flat_hash_set &ResourceID::RequestOnlyResources() { + static absl::flat_hash_set set{[]() { + absl::flat_hash_set res; + + std::string request_only_resources = RayConfig::instance().request_only_resources(); + if (!request_only_resources.empty()) { + std::vector results; + boost::split(results, request_only_resources, boost::is_any_of(",")); + for (std::string &result : results) { + int64_t resource_id = ResourceID(result).ToInt(); + res.insert(resource_id); + } + } + + return res; + }()}; + return set; +} + } // namespace scheduling } // namespace ray diff --git a/src/ray/common/scheduling/scheduling_ids.h b/src/ray/common/scheduling/scheduling_ids.h index 7bdc81c0e1739..abc2e88ffe069 100644 --- a/src/ray/common/scheduling/scheduling_ids.h +++ b/src/ray/common/scheduling/scheduling_ids.h @@ -178,6 +178,8 @@ class ResourceID : public BaseSchedulingID { return !IsPredefinedResource() && absl::StartsWith(Binary(), kImplicitResourcePrefix); } + bool IsRequestOnlyResource() const { return RequestOnlyResources().contains(id_); } + /// Resource ID of CPU. static ResourceID CPU() { return ResourceID(PredefinedResourcesEnum::CPU); } @@ -203,6 +205,9 @@ class ResourceID : public BaseSchedulingID { private: /// Return the IDs of all unit-instance resources. static absl::flat_hash_set &UnitInstanceResources(); + + /// Return the IDs of all request-only-instance resources. + static absl::flat_hash_set &RequestOnlyResources(); }; } // namespace scheduling diff --git a/src/ray/raylet/local_task_manager.cc b/src/ray/raylet/local_task_manager.cc index c2273f825b336..31c46330089ce 100644 --- a/src/ray/raylet/local_task_manager.cc +++ b/src/ray/raylet/local_task_manager.cc @@ -430,9 +430,11 @@ bool LocalTaskManager::PoppedWorkerHandler( const auto &required_resource = task.GetTaskSpecification().GetRequiredResources().GetResourceMap(); - for (auto &entry : required_resource) { // it fails here + for (auto &entry : required_resource) { + scheduling::ResourceID resource_id(entry.first); if (!cluster_resource_scheduler_->GetLocalResourceManager().ResourcesExist( - scheduling::ResourceID(entry.first)) && entry.first != "gpu_memory") { + resource_id) && + !resource_id.IsRequestOnlyResource()) { RAY_CHECK(task.GetTaskSpecification().PlacementGroupBundleId().first != PlacementGroupID::Nil()); RAY_LOG(DEBUG) << "The placement group: " diff --git a/src/ray/raylet/scheduling/local_resource_manager.cc b/src/ray/raylet/scheduling/local_resource_manager.cc index 512d1a4c76f8e..42ab0d3ae4544 100644 --- a/src/ray/raylet/scheduling/local_resource_manager.cc +++ b/src/ray/raylet/scheduling/local_resource_manager.cc @@ -82,7 +82,8 @@ bool LocalResourceManager::AllocateTaskResourceInstances( RAY_CHECK(task_allocation != nullptr); const ResourceSet adjusted_resource_request = local_resources_.ConvertRelativeResource(resource_request.GetResourceSet()); - if (resource_request.GetResourceSet().Has(ResourceID::GPU_Memory()) && adjusted_resource_request.Get(ResourceID::GPU()) > 1) { + if (resource_request.GetResourceSet().Has(ResourceID::GPU_Memory()) && + adjusted_resource_request.Get(ResourceID::GPU()) > 1) { return false; } // add adjust_gpu_memory here, added to NodeInstanceResourceSet From fc651f52772393997c4ba359b3d5331df6d627c3 Mon Sep 17 00:00:00 2001 From: Jonathan Nitisastro Date: Wed, 15 Nov 2023 23:52:49 -0800 Subject: [PATCH 06/14] add get gpu memory for nvidia Signed-off-by: Jonathan Nitisastro --- python/ray/_private/accelerators/nvidia_gpu.py | 12 +++++------- python/ray/actor.py | 2 +- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/python/ray/_private/accelerators/nvidia_gpu.py b/python/ray/_private/accelerators/nvidia_gpu.py index 82b429f7abff9..83916ed65cd94 100644 --- a/python/ray/_private/accelerators/nvidia_gpu.py +++ b/python/ray/_private/accelerators/nvidia_gpu.py @@ -130,13 +130,11 @@ def get_current_node_accelerator_memory() -> int: except pynvml.NVMLError: return 0 # pynvml init failed device_count = pynvml.nvmlDeviceGetCount() - cuda_device_type = None + cuda_device_memory = 0 if device_count > 0: handle = pynvml.nvmlDeviceGetHandleByIndex(0) - cuda_device_type = ( - NvidiaGPUAcceleratorManager._gpu_name_to_accelerator_type( - pynvml.nvmlDeviceGetName(handle) - ) - ) + cuda_device_memory = int(pynvml.nvmlDeviceGetMemoryInfo(handle).total) // ( + 1024 * 1024 + ) # in MB pynvml.nvmlShutdown() - return cuda_device_type + return cuda_device_memory diff --git a/python/ray/actor.py b/python/ray/actor.py index 0eed079d94150..3717769b7d135 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -1431,7 +1431,7 @@ def _make_actor(cls, actor_options): Class = _modify_class(cls) _inject_tracing_into_class(Class) - if "_gpu_memory" in actor_options and "num_gpus" in actor_options: + if actor_options.get("_gpu_memory", None) and actor_options.get("num_gpus", None): raise ValueError( "Specifying both `num_gpus` and `_gpu_memory` is not allowed. " "See more at: (link TBD)" From 00e9f745e6952858cda4d5048db57cf7ae6288b7 Mon Sep 17 00:00:00 2001 From: Jonathan Nitisastro Date: Thu, 16 Nov 2023 19:44:08 -0800 Subject: [PATCH 07/14] autoscaler fix Signed-off-by: Jonathan Nitisastro --- src/ray/raylet/scheduling/cluster_resource_manager.cc | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/ray/raylet/scheduling/cluster_resource_manager.cc b/src/ray/raylet/scheduling/cluster_resource_manager.cc index 9f2a007a128da..5a529d85c05ec 100644 --- a/src/ray/raylet/scheduling/cluster_resource_manager.cc +++ b/src/ray/raylet/scheduling/cluster_resource_manager.cc @@ -187,8 +187,10 @@ bool ClusterResourceManager::SubtractNodeAvailableResources( } NodeResources *resources = it->second.GetMutableLocalView(); + const ResourceSet adjusted_resource_request = + resources.ConvertRelativeResource(resource_request.GetResourceSet()); - resources->available -= resource_request.GetResourceSet(); + resources->available -= adjusted_resource_request; resources->available.RemoveNegative(); // TODO(swang): We should also subtract object store memory if the task has @@ -214,7 +216,10 @@ bool ClusterResourceManager::HasSufficientResource( return false; } - return resources.available >= resource_request.GetResourceSet(); + const ResourceSet adjusted_resource_request = + resources.ConvertRelativeResource(resource_request.GetResourceSet()); + + return resources.available >= adjusted_resource_request; } bool ClusterResourceManager::AddNodeAvailableResources(scheduling::NodeID node_id, From 73748ea7308968499142d884198e99b88d6c3e62 Mon Sep 17 00:00:00 2001 From: Jonathan Nitisastro Date: Thu, 16 Nov 2023 19:59:03 -0800 Subject: [PATCH 08/14] cleanup Signed-off-by: Jonathan Nitisastro --- .../common/scheduling/cluster_resource_data.cc | 3 +-- .../common/scheduling/cluster_resource_data.h | 17 ----------------- .../scheduling/cluster_resource_manager.cc | 2 +- 3 files changed, 2 insertions(+), 20 deletions(-) diff --git a/src/ray/common/scheduling/cluster_resource_data.cc b/src/ray/common/scheduling/cluster_resource_data.cc index 74f42b15374b2..30ae522c2e316 100644 --- a/src/ray/common/scheduling/cluster_resource_data.cc +++ b/src/ray/common/scheduling/cluster_resource_data.cc @@ -57,11 +57,10 @@ NodeResources ResourceMapToNodeResources( auto resource_map_total_copy = resource_map_total; auto resource_map_available_copy = resource_map_available; auto node_labels_copy = node_labels; - // move gpu_memory to node labels + if (resource_map_total.find("gpu_memory") != resource_map_total.end()) { node_labels_copy["gpu_memory"] = std::to_string(resource_map_total.at("gpu_memory") / resource_map_total.at("GPU")); - // RAY_CHECK(std::stod(node_labels_copy.at("gpu_memory")) == 1000); resource_map_total_copy.erase("gpu_memory"); resource_map_available_copy.erase("gpu_memory"); } else { diff --git a/src/ray/common/scheduling/cluster_resource_data.h b/src/ray/common/scheduling/cluster_resource_data.h index aa92716948c6b..50ef8c2f3c941 100644 --- a/src/ray/common/scheduling/cluster_resource_data.h +++ b/src/ray/common/scheduling/cluster_resource_data.h @@ -119,23 +119,6 @@ class ResourceRequest { bool requires_object_store_memory_ = false; }; -// update this and resource instance set, -// two options, both same idea: map derived resource id to vector of resource id - -// option 1: update in taskresource instance, resourceinstance set to override Set, add, -// etc for derived resource id for example Set(DerivedNodeID , -// abs::flat_hash_map> base_resources) how to not expose -// the base resource?? - -// option 2: map derived resource id to its base resource ids outside Resource instance -// problem: Tryallocate is hard to handle, esp need same index for all 3 resources -// + need to free all when failed allocating - -// note: both no need to store DerivedResourceID => vector, as all stored as -// base resource id - -// we might need to check if gpu_memory and gpu both exists here?? - /// Represents a resource set that contains the per-instance resource values. /// NOTE, unlike ResourceRequest, zero values won't be automatically removed in this /// class. Because otherwise we will lose the number of instances the set originally had diff --git a/src/ray/raylet/scheduling/cluster_resource_manager.cc b/src/ray/raylet/scheduling/cluster_resource_manager.cc index 5a529d85c05ec..c2269ffd04669 100644 --- a/src/ray/raylet/scheduling/cluster_resource_manager.cc +++ b/src/ray/raylet/scheduling/cluster_resource_manager.cc @@ -188,7 +188,7 @@ bool ClusterResourceManager::SubtractNodeAvailableResources( NodeResources *resources = it->second.GetMutableLocalView(); const ResourceSet adjusted_resource_request = - resources.ConvertRelativeResource(resource_request.GetResourceSet()); + resources->ConvertRelativeResource(resource_request.GetResourceSet()); resources->available -= adjusted_resource_request; resources->available.RemoveNegative(); From e7c443cc39a86834b8461db87e8b04a189079989 Mon Sep 17 00:00:00 2001 From: Jonathan Nitisastro Date: Fri, 17 Nov 2023 09:18:22 -0800 Subject: [PATCH 09/14] nit fix Signed-off-by: Jonathan Nitisastro --- python/ray/_private/worker.py | 4 ++-- python/ray/actor.py | 5 ++--- python/ray/remote_function.py | 4 ++-- src/ray/common/scheduling/resource_instance_set.cc | 13 ++++--------- 4 files changed, 10 insertions(+), 16 deletions(-) diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index aa420b7bfb9f1..e7241d7727018 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -1289,7 +1289,7 @@ def init( _memory: Amount of reservable memory resource in bytes rounded down to the nearest integer. _gpu_memory: The gpu memory request in megabytes for this task/actor - from a single gpu, rounded down to the nearest integer. + from a single gpu, rounded up to the nearest integer. _redis_password: Prevents external clients without the password from connecting to Redis if provided. _temp_dir: If provided, specifies the root temporary @@ -3307,7 +3307,7 @@ def method(self): memory: The heap memory request in bytes for this task/actor, rounded down to the nearest integer. _gpu_memory: The gpu memory request in megabytes for this task/actor - from a single gpu, rounded down to the nearest integer. + from a single gpu, rounded up to the nearest integer. max_calls: Only for *remote functions*. This specifies the maximum number of times that a given worker can execute the given remote function before it must exit diff --git a/python/ray/actor.py b/python/ray/actor.py index 3717769b7d135..d7f0e8c8a2787 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -350,8 +350,7 @@ class _ActorClassMetadata: num_gpus: The default number of GPUs required by the actor creation task. memory: The heap memory quota for this actor. - _gpu_memory: The gpu memory request in megabytes for this task/actor - from a single gpu, rounded down to the nearest integer. + _gpu_memory: The gpu memory request in megabytes for this actor. resources: The default resources required by the actor creation task. accelerator_type: The specified type of accelerator required for the node on which this actor runs. @@ -600,7 +599,7 @@ def options(self, **actor_options): memory: The heap memory request in bytes for this task/actor, rounded down to the nearest integer. _gpu_memory: The gpu memory request in megabytes for this task/actor - from a single gpu, rounded down to the nearest integer. + from a single gpu, rounded up to the nearest integer. object_store_memory: The object store memory request for actors only. max_restarts: This specifies the maximum number of times that the actor should be restarted when it dies diff --git a/python/ray/remote_function.py b/python/ray/remote_function.py index 93fe859433141..32034a6218957 100644 --- a/python/ray/remote_function.py +++ b/python/ray/remote_function.py @@ -57,7 +57,7 @@ class RemoteFunction: _memory: The heap memory request in bytes for this task/actor, rounded down to the nearest integer. _gpu_memory: The gpu memory request in megabytes for this task/actor - from a single gpu, rounded down to the nearest integer. + from a single gpu, rounded up to the nearest integer. _resources: The default custom resource requirements for invocations of this remote function. _num_returns: The default number of return values for invocations @@ -187,7 +187,7 @@ def options(self, **task_options): memory: The heap memory request in bytes for this task/actor, rounded down to the nearest integer. _gpu_memory: The gpu memory request in megabytes for this task/actor - from a single gpu, rounded down to the nearest integer. + from a single gpu, rounded up to the nearest integer. object_store_memory: The object store memory request for actors only. max_calls: This specifies the maximum number of times that a given worker can execute diff --git a/src/ray/common/scheduling/resource_instance_set.cc b/src/ray/common/scheduling/resource_instance_set.cc index 8d30d03c9d3c8..80e985b27c83b 100644 --- a/src/ray/common/scheduling/resource_instance_set.cc +++ b/src/ray/common/scheduling/resource_instance_set.cc @@ -22,8 +22,7 @@ namespace ray { NodeResourceInstanceSet::NodeResourceInstanceSet(const NodeResourceSet &total) { - auto resource_ids = total.ExplicitResourceIds(); - for (auto &resource_id : resource_ids) { + for (auto &resource_id : total.ExplicitResourceIds()) { std::vector instances; auto value = total.Get(resource_id); if (resource_id.IsUnitInstanceResource()) { @@ -31,13 +30,6 @@ NodeResourceInstanceSet::NodeResourceInstanceSet(const NodeResourceSet &total) { for (size_t i = 0; i < num_instances; i++) { instances.push_back(1.0); }; - } else if (resource_id == ResourceID::GPU_Memory() && - resource_ids.find(ResourceID::GPU()) != resource_ids.end()) { - double num_gpus = total.Get(ResourceID::GPU()).Double(); - for (size_t i = 0; i < static_cast(num_gpus); i++) { - instances.push_back(value.Double() / num_gpus); - }; - } else { instances.push_back(value); } @@ -101,6 +93,7 @@ bool NodeResourceInstanceSet::operator==(const NodeResourceInstanceSet &other) c std::optional>> NodeResourceInstanceSet::TryAllocate(const ResourceSet &resource_demands) { absl::flat_hash_map> allocations; + // update this to TryAllocateBundle for (const auto &[resource_id, demand] : resource_demands.Resources()) { auto allocation = TryAllocate(resource_id, demand); if (allocation) { @@ -142,6 +135,8 @@ std::optional> NodeResourceInstanceSet::TryAllocate( return std::nullopt; } } + // need to update this to support instance > 1 + // still unit tho, might need to create different TryAllocate function // If resources has multiple instances, each instance has total capacity of 1. // From fc76d0627467097a28b5a308777a6be5306fa911 Mon Sep 17 00:00:00 2001 From: Jonathan Nitisastro Date: Mon, 20 Nov 2023 15:42:06 -0800 Subject: [PATCH 10/14] convert gpu memory from mb to bytes Signed-off-by: Jonathan Nitisastro --- python/ray/_private/accelerators/nvidia_gpu.py | 4 +--- python/ray/_private/worker.py | 4 ++-- python/ray/actor.py | 4 ++-- python/ray/remote_function.py | 4 ++-- python/ray/scripts/scripts.py | 2 +- 5 files changed, 8 insertions(+), 10 deletions(-) diff --git a/python/ray/_private/accelerators/nvidia_gpu.py b/python/ray/_private/accelerators/nvidia_gpu.py index 83916ed65cd94..b2dbbc3e30751 100644 --- a/python/ray/_private/accelerators/nvidia_gpu.py +++ b/python/ray/_private/accelerators/nvidia_gpu.py @@ -133,8 +133,6 @@ def get_current_node_accelerator_memory() -> int: cuda_device_memory = 0 if device_count > 0: handle = pynvml.nvmlDeviceGetHandleByIndex(0) - cuda_device_memory = int(pynvml.nvmlDeviceGetMemoryInfo(handle).total) // ( - 1024 * 1024 - ) # in MB + cuda_device_memory = int(pynvml.nvmlDeviceGetMemoryInfo(handle).total) pynvml.nvmlShutdown() return cuda_device_memory diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index e7241d7727018..295b6f799ecc8 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -1288,7 +1288,7 @@ def init( _driver_object_store_memory: Deprecated. _memory: Amount of reservable memory resource in bytes rounded down to the nearest integer. - _gpu_memory: The gpu memory request in megabytes for this task/actor + _gpu_memory: The gpu memory request in bytes for this task/actor from a single gpu, rounded up to the nearest integer. _redis_password: Prevents external clients without the password from connecting to Redis if provided. @@ -3306,7 +3306,7 @@ def method(self): See :ref:`accelerator types `. memory: The heap memory request in bytes for this task/actor, rounded down to the nearest integer. - _gpu_memory: The gpu memory request in megabytes for this task/actor + _gpu_memory: The gpu memory request in bytes for this task/actor from a single gpu, rounded up to the nearest integer. max_calls: Only for *remote functions*. This specifies the maximum number of times that a given worker can execute diff --git a/python/ray/actor.py b/python/ray/actor.py index d7f0e8c8a2787..dd40daf5a59c7 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -350,7 +350,7 @@ class _ActorClassMetadata: num_gpus: The default number of GPUs required by the actor creation task. memory: The heap memory quota for this actor. - _gpu_memory: The gpu memory request in megabytes for this actor. + _gpu_memory: The gpu memory request in bytes for this actor. resources: The default resources required by the actor creation task. accelerator_type: The specified type of accelerator required for the node on which this actor runs. @@ -598,7 +598,7 @@ def options(self, **actor_options): See :ref:`accelerator types `. memory: The heap memory request in bytes for this task/actor, rounded down to the nearest integer. - _gpu_memory: The gpu memory request in megabytes for this task/actor + _gpu_memory: The gpu memory request in bytes for this task/actor from a single gpu, rounded up to the nearest integer. object_store_memory: The object store memory request for actors only. max_restarts: This specifies the maximum diff --git a/python/ray/remote_function.py b/python/ray/remote_function.py index 32034a6218957..7a0367eb1366b 100644 --- a/python/ray/remote_function.py +++ b/python/ray/remote_function.py @@ -56,7 +56,7 @@ class RemoteFunction: remote function. _memory: The heap memory request in bytes for this task/actor, rounded down to the nearest integer. - _gpu_memory: The gpu memory request in megabytes for this task/actor + _gpu_memory: The gpu memory request in bytes for this task/actor from a single gpu, rounded up to the nearest integer. _resources: The default custom resource requirements for invocations of this remote function. @@ -186,7 +186,7 @@ def options(self, **task_options): See :ref:`accelerator types `. memory: The heap memory request in bytes for this task/actor, rounded down to the nearest integer. - _gpu_memory: The gpu memory request in megabytes for this task/actor + _gpu_memory: The gpu memory request in bytes for this task/actor from a single gpu, rounded up to the nearest integer. object_store_memory: The object store memory request for actors only. max_calls: This specifies the diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 74450965c29ef..4673db2433ae3 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -385,7 +385,7 @@ def debug(address): "--gpu-memory", required=False, type=int, - help="The total amount of memory (in megabytes) to make available to workers. " + help="The total amount of memory (in bytes) to make available to workers. " "By default, this is set to the sum of available memory " "from the gpus detected gpus in the node.", ) From 31cf7244c4a9dcc3dbc361828e0b99ffef267fa9 Mon Sep 17 00:00:00 2001 From: Jonathan Nitisastro Date: Mon, 20 Nov 2023 22:06:25 -0800 Subject: [PATCH 11/14] autoscaler support Signed-off-by: Jonathan Nitisastro --- .../_private/fake_multi_node/node_provider.py | 1 + .../_private/resource_demand_scheduler.py | 55 +++++++++++++++++-- python/ray/cluster_utils.py | 2 + .../scheduling/cluster_resource_data.cc | 34 ++++++------ .../common/scheduling/cluster_resource_data.h | 4 +- .../scheduling/resource_instance_set.cc | 3 - .../scheduling/cluster_resource_manager.cc | 4 +- .../scheduling/local_resource_manager.cc | 2 +- 8 files changed, 75 insertions(+), 30 deletions(-) diff --git a/python/ray/autoscaler/_private/fake_multi_node/node_provider.py b/python/ray/autoscaler/_private/fake_multi_node/node_provider.py index 29fdc3d0490fa..deeb584e66522 100644 --- a/python/ray/autoscaler/_private/fake_multi_node/node_provider.py +++ b/python/ray/autoscaler/_private/fake_multi_node/node_provider.py @@ -316,6 +316,7 @@ def create_node_with_resources_and_labels( num_cpus=resources.pop("CPU", 0), num_gpus=resources.pop("GPU", 0), object_store_memory=resources.pop("object_store_memory", None), + _gpu_memory=resources.pop("gpu_memory", 0), resources=resources, labels=labels, redis_address="{}:6379".format( diff --git a/python/ray/autoscaler/_private/resource_demand_scheduler.py b/python/ray/autoscaler/_private/resource_demand_scheduler.py index 1db20be69c622..4cfe3b07723c7 100644 --- a/python/ray/autoscaler/_private/resource_demand_scheduler.py +++ b/python/ray/autoscaler/_private/resource_demand_scheduler.py @@ -109,7 +109,7 @@ def __init__( upscaling_speed: float, ) -> None: self.provider = provider - self.node_types = copy.deepcopy(node_types) + self.node_types = self._adjust_node_types(copy.deepcopy(node_types)) self.node_resource_updated = set() self.max_workers = max_workers self.head_node_type = head_node_type @@ -151,12 +151,27 @@ def reset_config( inferered resources are not lost. """ self.provider = provider - self.node_types = copy.deepcopy(node_types) + self.node_types = self._adjust_node_types(copy.deepcopy(node_types)) self.node_resource_updated = set() self.max_workers = max_workers self.head_node_type = head_node_type self.upscaling_speed = upscaling_speed + def _adjust_node_types(self, node_types): + # update available_node_types gpu_memory to gpu_memory_per_gpu + for node_type, node_config in node_types.items(): + resources = node_config["resources"] + if "gpu_memory" in resources: + if "GPU" in resources and resources["GPU"] > 0: + resources["node:gpu_memory_per_gpu"] = ( + resources["gpu_memory"] / resources["GPU"] + ) + else: + resources["node:gpu_memory_per_gpu"] = 0 + del resources["gpu_memory"] + node_types[node_type] = node_config + return node_types + def is_feasible(self, bundle: ResourceDict) -> bool: for node_type, config in self.node_types.items(): max_of_type = config.get("max_workers", 0) @@ -372,6 +387,10 @@ def _update_node_resources_from_runtime( for key in ["CPU", "GPU", "memory", "object_store_memory"]: if key in runtime_resources: resources[key] = runtime_resources[key] + if "gpu_memory" in runtime_resources and "GPU" in runtime_resources: + resources["node:gpu_memory_per_gpu"] = int( + runtime_resources["gpu_memory"] + ) / int(runtime_resources["GPU"]) self.node_types[node_type]["resources"] = resources node_kind = tags[TAG_RAY_NODE_KIND] @@ -823,7 +842,7 @@ def _resource_based_utilization_scorer( num_matching_resource_types = 0 for k, v in node_resources.items(): # Don't divide by zero. - if v < 1: + if v < 1 or k == "node::gpu_memory_per_gpu": # Could test v == 0 on the nose, but v < 1 feels safer. # (Note that node resources are integers.) continue @@ -931,8 +950,31 @@ def get_bin_pack_residual( return unfulfilled, nodes + used +def _convert_relative_resources( + node: ResourceDict, resources: ResourceDict +) -> Optional[ResourceDict]: + # return None if relative resources can't be converted + adjusted_resources = resources.copy() + if "gpu_memory" in resources: + if ( + "node:gpu_memory_per_gpu" not in node + or node["node:gpu_memory_per_gpu"] == 0 + ): + return None + adjusted_resources["GPU"] = ( + resources["gpu_memory"] / node["node:gpu_memory_per_gpu"] + ) + if adjusted_resources["GPU"] > 1.0: + return None + del adjusted_resources["gpu_memory"] + return adjusted_resources + + def _fits(node: ResourceDict, resources: ResourceDict) -> bool: - for k, v in resources.items(): + adjusted_resources = _convert_relative_resources(node, resources) + if adjusted_resources is None: + return False + for k, v in adjusted_resources.items(): # TODO(jjyao): Change ResourceDict to a class so we can # hide the implicit resource handling. if v > node.get( @@ -943,7 +985,10 @@ def _fits(node: ResourceDict, resources: ResourceDict) -> bool: def _inplace_subtract(node: ResourceDict, resources: ResourceDict) -> None: - for k, v in resources.items(): + adjusted_resources = _convert_relative_resources(node, resources) + if adjusted_resources is None: + return + for k, v in adjusted_resources.items(): if v == 0: # This is an edge case since some reasonable programs/computers can # do `ray.autoscaler.sdk.request_resources({"GPU": 0}"})`. diff --git a/python/ray/cluster_utils.py b/python/ray/cluster_utils.py index eaf446f3620c2..848a4d65c4a3e 100644 --- a/python/ray/cluster_utils.py +++ b/python/ray/cluster_utils.py @@ -85,6 +85,8 @@ def start(self, _system_config=None, override_env: Optional[Dict] = None): self._head_resources.pop("object_store_memory") ) ) + if "gpu_memory" in self._head_resources: + cmd.append("--gpu-memory={}".format(self._head_resources.pop("gpu_memory"))) if self._head_resources: cmd.append("--resources='{}'".format(json.dumps(self._head_resources))) if _system_config is not None: diff --git a/src/ray/common/scheduling/cluster_resource_data.cc b/src/ray/common/scheduling/cluster_resource_data.cc index 30ae522c2e316..0bbe23510f210 100644 --- a/src/ray/common/scheduling/cluster_resource_data.cc +++ b/src/ray/common/scheduling/cluster_resource_data.cc @@ -59,12 +59,12 @@ NodeResources ResourceMapToNodeResources( auto node_labels_copy = node_labels; if (resource_map_total.find("gpu_memory") != resource_map_total.end()) { - node_labels_copy["gpu_memory"] = std::to_string(resource_map_total.at("gpu_memory") / - resource_map_total.at("GPU")); + node_labels_copy["_gpu_memory_per_gpu"] = std::to_string( + resource_map_total.at("gpu_memory") / resource_map_total.at("GPU")); resource_map_total_copy.erase("gpu_memory"); resource_map_available_copy.erase("gpu_memory"); } else { - node_labels_copy["gpu_memory"] = "0"; + node_labels_copy["_gpu_memory_per_gpu"] = "0"; } node_resources.total = NodeResourceSet(resource_map_total_copy); @@ -107,7 +107,7 @@ bool NodeResources::IsAvailable(const ResourceRequest &resource_request, return false; } const ResourceSet resource_request_adjusted = - this->ConvertRelativeResource(resource_request.GetResourceSet()); + this->ConvertRelativeResources(resource_request.GetResourceSet()); if (!this->normal_task_resources.IsEmpty()) { auto available_resources = this->available; available_resources -= this->normal_task_resources; @@ -118,7 +118,7 @@ bool NodeResources::IsAvailable(const ResourceRequest &resource_request, bool NodeResources::IsFeasible(const ResourceRequest &resource_request) const { const ResourceSet resource_request_adjusted = - this->ConvertRelativeResource(resource_request.GetResourceSet()); + this->ConvertRelativeResources(resource_request.GetResourceSet()); return this->total >= resource_request_adjusted; } @@ -143,21 +143,21 @@ std::string NodeResources::DebugString() const { return buffer.str(); } -const ResourceSet NodeResources::ConvertRelativeResource( +const ResourceSet NodeResources::ConvertRelativeResources( const ResourceSet &resource) const { ResourceSet adjusted_resource = resource; // convert gpu_memory to GPU if (resource.Has(ResourceID::GPU_Memory())) { - double total_gpu_memory = 0; - if (this->labels.find("gpu_memory") != this->labels.end()) { + double total_gpu_memory_per_gpu = 0; + if (this->labels.find("_gpu_memory_per_gpu") != this->labels.end()) { // TODO: raise exception if this is not true - total_gpu_memory = std::stod(this->labels.at("gpu_memory")); + total_gpu_memory_per_gpu = std::stod(this->labels.at("_gpu_memory_per_gpu")); } double num_gpus_request = 0; - if (total_gpu_memory > 0) { + if (total_gpu_memory_per_gpu > 0) { // round up to closes kResourceUnitScaling num_gpus_request = - (resource.Get(ResourceID::GPU_Memory()).Double() / total_gpu_memory) + + (resource.Get(ResourceID::GPU_Memory()).Double() / total_gpu_memory_per_gpu) + 1 / static_cast(2 * kResourceUnitScaling); } adjusted_resource.Set(ResourceID::GPU(), num_gpus_request); @@ -193,20 +193,20 @@ const NodeResourceInstanceSet &NodeResourceInstances::GetTotalResourceInstances( return this->total; }; -const ResourceSet NodeResourceInstances::ConvertRelativeResource( +const ResourceSet NodeResourceInstances::ConvertRelativeResources( const ResourceSet &resource) const { ResourceSet adjusted_resource = resource; // convert gpu_memory to GPU if (resource.Has(ResourceID::GPU_Memory())) { - double total_gpu_memory = 0; - if (this->labels.find("gpu_memory") != this->labels.end()) { - total_gpu_memory = std::stod(this->labels.at("gpu_memory")); + double total_gpu_memory_per_gpu = 0; + if (this->labels.find("_gpu_memory_per_gpu") != this->labels.end()) { + total_gpu_memory_per_gpu = std::stod(this->labels.at("_gpu_memory_per_gpu")); } double num_gpus_request = 0; - if (total_gpu_memory > 0) { + if (total_gpu_memory_per_gpu > 0) { // round up to closes kResourceUnitScaling num_gpus_request = - (resource.Get(ResourceID::GPU_Memory()).Double() / total_gpu_memory) + + (resource.Get(ResourceID::GPU_Memory()).Double() / total_gpu_memory_per_gpu) + 1 / static_cast(2 * kResourceUnitScaling); } adjusted_resource.Set(ResourceID::GPU(), num_gpus_request); diff --git a/src/ray/common/scheduling/cluster_resource_data.h b/src/ray/common/scheduling/cluster_resource_data.h index 50ef8c2f3c941..97a4f6c3e9033 100644 --- a/src/ray/common/scheduling/cluster_resource_data.h +++ b/src/ray/common/scheduling/cluster_resource_data.h @@ -335,7 +335,7 @@ class NodeResources { std::string DictString() const; // Returns adjusted ResourceSet after converting resource relative to others. // For example: gpu_memory => num_gpus = gpu_memory / total.gpu_memory. - const ResourceSet ConvertRelativeResource(const ResourceSet &resource) const; + const ResourceSet ConvertRelativeResources(const ResourceSet &resource) const; }; /// Total and available capacities of each resource instance. @@ -357,7 +357,7 @@ class NodeResourceInstances { // Returns adjusted ResourceSet after converting resource relative to others. // For example: gpu_memory => num_gpus = gpu_memory / total.gpu_memory. - const ResourceSet ConvertRelativeResource(const ResourceSet &resource) const; + const ResourceSet ConvertRelativeResources(const ResourceSet &resource) const; }; struct Node { diff --git a/src/ray/common/scheduling/resource_instance_set.cc b/src/ray/common/scheduling/resource_instance_set.cc index 80e985b27c83b..976deee39383e 100644 --- a/src/ray/common/scheduling/resource_instance_set.cc +++ b/src/ray/common/scheduling/resource_instance_set.cc @@ -93,7 +93,6 @@ bool NodeResourceInstanceSet::operator==(const NodeResourceInstanceSet &other) c std::optional>> NodeResourceInstanceSet::TryAllocate(const ResourceSet &resource_demands) { absl::flat_hash_map> allocations; - // update this to TryAllocateBundle for (const auto &[resource_id, demand] : resource_demands.Resources()) { auto allocation = TryAllocate(resource_id, demand); if (allocation) { @@ -135,8 +134,6 @@ std::optional> NodeResourceInstanceSet::TryAllocate( return std::nullopt; } } - // need to update this to support instance > 1 - // still unit tho, might need to create different TryAllocate function // If resources has multiple instances, each instance has total capacity of 1. // diff --git a/src/ray/raylet/scheduling/cluster_resource_manager.cc b/src/ray/raylet/scheduling/cluster_resource_manager.cc index c2269ffd04669..f0a6afed6c84a 100644 --- a/src/ray/raylet/scheduling/cluster_resource_manager.cc +++ b/src/ray/raylet/scheduling/cluster_resource_manager.cc @@ -188,7 +188,7 @@ bool ClusterResourceManager::SubtractNodeAvailableResources( NodeResources *resources = it->second.GetMutableLocalView(); const ResourceSet adjusted_resource_request = - resources->ConvertRelativeResource(resource_request.GetResourceSet()); + resources->ConvertRelativeResources(resource_request.GetResourceSet()); resources->available -= adjusted_resource_request; resources->available.RemoveNegative(); @@ -217,7 +217,7 @@ bool ClusterResourceManager::HasSufficientResource( } const ResourceSet adjusted_resource_request = - resources.ConvertRelativeResource(resource_request.GetResourceSet()); + resources.ConvertRelativeResources(resource_request.GetResourceSet()); return resources.available >= adjusted_resource_request; } diff --git a/src/ray/raylet/scheduling/local_resource_manager.cc b/src/ray/raylet/scheduling/local_resource_manager.cc index 42ab0d3ae4544..bf64d2495615c 100644 --- a/src/ray/raylet/scheduling/local_resource_manager.cc +++ b/src/ray/raylet/scheduling/local_resource_manager.cc @@ -81,7 +81,7 @@ bool LocalResourceManager::AllocateTaskResourceInstances( std::shared_ptr task_allocation) { RAY_CHECK(task_allocation != nullptr); const ResourceSet adjusted_resource_request = - local_resources_.ConvertRelativeResource(resource_request.GetResourceSet()); + local_resources_.ConvertRelativeResources(resource_request.GetResourceSet()); if (resource_request.GetResourceSet().Has(ResourceID::GPU_Memory()) && adjusted_resource_request.Get(ResourceID::GPU()) > 1) { return false; From cb438e5fa732e8f8cfcbd61719924aa3990532fc Mon Sep 17 00:00:00 2001 From: Jonathan Nitisastro Date: Fri, 24 Nov 2023 22:19:18 -0800 Subject: [PATCH 12/14] autoscaler fix Signed-off-by: Jonathan Nitisastro --- .../_private/fake_multi_node/node_provider.py | 2 +- .../_private/resource_demand_scheduler.py | 7 +++- .../tests/test_autoscaler_fake_multinode.py | 6 ++++ .../scheduling/cluster_resource_data.cc | 32 ++++++++++++++++++- .../common/scheduling/cluster_resource_data.h | 4 +-- .../scheduling/cluster_resource_manager.cc | 17 ++++++---- .../scheduling/local_resource_manager.cc | 9 +++--- src/ray/raylet/scheduling/policy/scorer.cc | 7 ++-- .../raylet/scheduling/scheduling_policy.cc | 3 +- 9 files changed, 68 insertions(+), 19 deletions(-) diff --git a/python/ray/autoscaler/_private/fake_multi_node/node_provider.py b/python/ray/autoscaler/_private/fake_multi_node/node_provider.py index deeb584e66522..076efa4bf1671 100644 --- a/python/ray/autoscaler/_private/fake_multi_node/node_provider.py +++ b/python/ray/autoscaler/_private/fake_multi_node/node_provider.py @@ -316,7 +316,7 @@ def create_node_with_resources_and_labels( num_cpus=resources.pop("CPU", 0), num_gpus=resources.pop("GPU", 0), object_store_memory=resources.pop("object_store_memory", None), - _gpu_memory=resources.pop("gpu_memory", 0), + _gpu_memory=resources.pop("gpu_memory", 0), # gpu memory = 600, resources=resources, labels=labels, redis_address="{}:6379".format( diff --git a/python/ray/autoscaler/_private/resource_demand_scheduler.py b/python/ray/autoscaler/_private/resource_demand_scheduler.py index 4cfe3b07723c7..383af9d6002c2 100644 --- a/python/ray/autoscaler/_private/resource_demand_scheduler.py +++ b/python/ray/autoscaler/_private/resource_demand_scheduler.py @@ -545,7 +545,12 @@ def add_node(node_type, available_resources=None): if TAG_RAY_USER_NODE_TYPE in tags: node_type = tags[TAG_RAY_USER_NODE_TYPE] ip = self.provider.internal_ip(node_id) - available_resources = unused_resources_by_ip.get(ip) + available_resources = copy.deepcopy(unused_resources_by_ip.get(ip)) + available = self.node_types[node_type]["resources"] + if available_resources and "node:gpu_memory_per_gpu" in available: + available_resources["node:gpu_memory_per_gpu"] = available[ + "node:gpu_memory_per_gpu" + ] add_node(node_type, available_resources) for node_type, count in pending_nodes.items(): diff --git a/python/ray/tests/test_autoscaler_fake_multinode.py b/python/ray/tests/test_autoscaler_fake_multinode.py index 9f8ee37c1e0cd..de79bc98ec24b 100644 --- a/python/ray/tests/test_autoscaler_fake_multinode.py +++ b/python/ray/tests/test_autoscaler_fake_multinode.py @@ -25,6 +25,7 @@ def test_fake_autoscaler_basic_e2e(shutdown_only): "CPU": 2, "GPU": 1, "object_store_memory": 1024 * 1024 * 1024, + "gpu_memory": 1000, }, "node_config": {}, "min_workers": 0, @@ -52,6 +53,11 @@ def test_fake_autoscaler_basic_e2e(shutdown_only): def f(): print("gpu ok") + # Triggers the addition of a GPU memory node. + @ray.remote(_gpu_memory=1000) + def f2(): + print("gpu memory ok") + # Triggers the addition of a CPU node. @ray.remote(num_cpus=3) def g(): diff --git a/src/ray/common/scheduling/cluster_resource_data.cc b/src/ray/common/scheduling/cluster_resource_data.cc index 0bbe23510f210..15f8acdc5eefc 100644 --- a/src/ray/common/scheduling/cluster_resource_data.cc +++ b/src/ray/common/scheduling/cluster_resource_data.cc @@ -20,6 +20,27 @@ namespace ray { using namespace ::ray::scheduling; +NodeResources::NodeResources(const NodeResourceSet &resources) { + NodeResourceSet resources_adjusted = resources; + absl::flat_hash_map node_labels; + if (resources.Has(ResourceID::GPU_Memory())) { + // if gpu_memory is set, default GPU value is 1 + if (!resources.Has(ResourceID::GPU())) { + resources_adjusted.Set(ResourceID::GPU(), 1); + } + node_labels["_gpu_memory_per_gpu"] = + std::to_string(resources.Get(ResourceID::GPU_Memory()).Double() / + resources_adjusted.Get(ResourceID::GPU()).Double()); + resources_adjusted.Set(ResourceID::GPU_Memory(), 0); + } else { + node_labels["_gpu_memory_per_gpu"] = "0"; + } + + this->total = resources_adjusted; + this->available = resources_adjusted; + this->labels = node_labels; +} + /// Convert a map of resources to a ResourceRequest data structure. ResourceRequest ResourceMapToResourceRequest( const absl::flat_hash_map &resource_map, @@ -59,8 +80,12 @@ NodeResources ResourceMapToNodeResources( auto node_labels_copy = node_labels; if (resource_map_total.find("gpu_memory") != resource_map_total.end()) { + // if gpu_memory is set, default GPU value is 1 + if (resource_map_total.find("GPU") == resource_map_total.end()) { + resource_map_total_copy["GPU"] = 1; + } node_labels_copy["_gpu_memory_per_gpu"] = std::to_string( - resource_map_total.at("gpu_memory") / resource_map_total.at("GPU")); + resource_map_total.at("gpu_memory") / resource_map_total_copy.at("GPU")); resource_map_total_copy.erase("gpu_memory"); resource_map_available_copy.erase("gpu_memory"); } else { @@ -108,6 +133,7 @@ bool NodeResources::IsAvailable(const ResourceRequest &resource_request, } const ResourceSet resource_request_adjusted = this->ConvertRelativeResources(resource_request.GetResourceSet()); + if (!this->normal_task_resources.IsEmpty()) { auto available_resources = this->available; available_resources -= this->normal_task_resources; @@ -159,6 +185,8 @@ const ResourceSet NodeResources::ConvertRelativeResources( num_gpus_request = (resource.Get(ResourceID::GPU_Memory()).Double() / total_gpu_memory_per_gpu) + 1 / static_cast(2 * kResourceUnitScaling); + } else { + return resource; } adjusted_resource.Set(ResourceID::GPU(), num_gpus_request); adjusted_resource.Set(ResourceID::GPU_Memory(), 0); @@ -208,6 +236,8 @@ const ResourceSet NodeResourceInstances::ConvertRelativeResources( num_gpus_request = (resource.Get(ResourceID::GPU_Memory()).Double() / total_gpu_memory_per_gpu) + 1 / static_cast(2 * kResourceUnitScaling); + } else { + return resource; } adjusted_resource.Set(ResourceID::GPU(), num_gpus_request); adjusted_resource.Set(ResourceID::GPU_Memory(), 0); diff --git a/src/ray/common/scheduling/cluster_resource_data.h b/src/ray/common/scheduling/cluster_resource_data.h index 97a4f6c3e9033..b88417062f8c8 100644 --- a/src/ray/common/scheduling/cluster_resource_data.h +++ b/src/ray/common/scheduling/cluster_resource_data.h @@ -289,8 +289,8 @@ class TaskResourceInstances { class NodeResources { public: NodeResources() {} - NodeResources(const NodeResourceSet &resources) - : total(resources), available(resources) {} + NodeResources(const NodeResourceSet &resources); + NodeResourceSet total; NodeResourceSet available; /// Only used by light resource report. diff --git a/src/ray/raylet/scheduling/cluster_resource_manager.cc b/src/ray/raylet/scheduling/cluster_resource_manager.cc index f0a6afed6c84a..23138c0ddbff0 100644 --- a/src/ray/raylet/scheduling/cluster_resource_manager.cc +++ b/src/ray/raylet/scheduling/cluster_resource_manager.cc @@ -88,6 +88,7 @@ bool ClusterResourceManager::UpdateNode( local_view.total = node_resources.total; local_view.available = node_resources.available; + // might need label transfer here local_view.object_pulls_queued = resource_view_sync_message.object_pulls_queued(); // Update the idle duration for the node in terms of resources usage. @@ -187,10 +188,10 @@ bool ClusterResourceManager::SubtractNodeAvailableResources( } NodeResources *resources = it->second.GetMutableLocalView(); - const ResourceSet adjusted_resource_request = + const ResourceSet resource_request_adjusted = resources->ConvertRelativeResources(resource_request.GetResourceSet()); - resources->available -= adjusted_resource_request; + resources->available -= resource_request_adjusted; resources->available.RemoveNegative(); // TODO(swang): We should also subtract object store memory if the task has @@ -216,10 +217,10 @@ bool ClusterResourceManager::HasSufficientResource( return false; } - const ResourceSet adjusted_resource_request = + const ResourceSet resource_request_adjusted = resources.ConvertRelativeResources(resource_request.GetResourceSet()); - return resources.available >= adjusted_resource_request; + return resources.available >= resource_request_adjusted; } bool ClusterResourceManager::AddNodeAvailableResources(scheduling::NodeID node_id, @@ -228,13 +229,15 @@ bool ClusterResourceManager::AddNodeAvailableResources(scheduling::NodeID node_i if (it == nodes_.end()) { return false; } - auto node_resources = it->second.GetMutableLocalView(); - for (auto &resource_id : resource_set.ResourceIds()) { + const ResourceSet adjusted_resources = + node_resources->ConvertRelativeResources(resource_set); + + for (auto &resource_id : adjusted_resources.ResourceIds()) { if (node_resources->total.Has(resource_id)) { auto available = node_resources->available.Get(resource_id); auto total = node_resources->total.Get(resource_id); - auto new_available = available + resource_set.Get(resource_id); + auto new_available = available + adjusted_resources.Get(resource_id); if (new_available > total) { new_available = total; } diff --git a/src/ray/raylet/scheduling/local_resource_manager.cc b/src/ray/raylet/scheduling/local_resource_manager.cc index bf64d2495615c..c0ce2f679d898 100644 --- a/src/ray/raylet/scheduling/local_resource_manager.cc +++ b/src/ray/raylet/scheduling/local_resource_manager.cc @@ -80,17 +80,18 @@ bool LocalResourceManager::AllocateTaskResourceInstances( const ResourceRequest &resource_request, std::shared_ptr task_allocation) { RAY_CHECK(task_allocation != nullptr); - const ResourceSet adjusted_resource_request = + const ResourceSet resource_request_adjusted = local_resources_.ConvertRelativeResources(resource_request.GetResourceSet()); + if (resource_request.GetResourceSet().Has(ResourceID::GPU_Memory()) && - adjusted_resource_request.Get(ResourceID::GPU()) > 1) { + resource_request_adjusted.Get(ResourceID::GPU()) > 1) { return false; } // add adjust_gpu_memory here, added to NodeInstanceResourceSet - auto allocation = local_resources_.available.TryAllocate(adjusted_resource_request); + auto allocation = local_resources_.available.TryAllocate(resource_request_adjusted); if (allocation) { *task_allocation = TaskResourceInstances(*allocation); - for (const auto &resource_id : adjusted_resource_request.ResourceIds()) { + for (const auto &resource_id : resource_request_adjusted.ResourceIds()) { SetResourceNonIdle(resource_id); } return true; diff --git a/src/ray/raylet/scheduling/policy/scorer.cc b/src/ray/raylet/scheduling/policy/scorer.cc index b8c67f3d920d0..9df65e65b4926 100644 --- a/src/ray/raylet/scheduling/policy/scorer.cc +++ b/src/ray/raylet/scheduling/policy/scorer.cc @@ -35,8 +35,11 @@ double LeastResourceScorer::Score(const ResourceRequest &required_resources, } double node_score = 0.; - for (auto &resource_id : required_resources.ResourceIds()) { - const auto &request_resource = required_resources.Get(resource_id); + const ResourceSet resource_request_adjusted = + node_resources_ptr->ConvertRelativeResources(required_resources.GetResourceSet()); + + for (auto &resource_id : resource_request_adjusted.ResourceIds()) { + const auto &request_resource = resource_request_adjusted.Get(resource_id); const auto &node_available_resource = node_resources_ptr->available.Get(resource_id); auto score = Calculate(request_resource, node_available_resource); if (score < 0.) { diff --git a/src/ray/raylet/scheduling/scheduling_policy.cc b/src/ray/raylet/scheduling/scheduling_policy.cc index 334c945e28de1..eeaac800fd5b0 100644 --- a/src/ray/raylet/scheduling/scheduling_policy.cc +++ b/src/ray/raylet/scheduling/scheduling_policy.cc @@ -24,7 +24,8 @@ namespace raylet_scheduling_policy { namespace { bool IsGPURequest(const ResourceRequest &resource_request) { - return resource_request.Has(ResourceID::GPU()); + return resource_request.Has(ResourceID::GPU()) || + resource_request.Has(ResourceID::GPU_Memory()); } bool DoesNodeHaveGPUs(const NodeResources &resources) { From 550170a62cc46cc501f3d88924660fa8b7260c7b Mon Sep 17 00:00:00 2001 From: Jonathan Nitisastro Date: Fri, 24 Nov 2023 22:23:23 -0800 Subject: [PATCH 13/14] aws node provider support Signed-off-by: Jonathan Nitisastro --- .../ray/_private/accelerators/accelerator.py | 18 ++++++++++++++++++ python/ray/_private/accelerators/nvidia_gpu.py | 14 ++++++++++++++ .../autoscaler/_private/aws/node_provider.py | 13 +++++++++++++ .../_private/resource_demand_scheduler.py | 11 +++++++---- python/ray/tests/test_autoscaler_yaml.py | 12 ++++++++++-- 5 files changed, 62 insertions(+), 6 deletions(-) diff --git a/python/ray/_private/accelerators/accelerator.py b/python/ray/_private/accelerators/accelerator.py index ae9f30908a851..7a7e12bd58a8c 100644 --- a/python/ray/_private/accelerators/accelerator.py +++ b/python/ray/_private/accelerators/accelerator.py @@ -146,3 +146,21 @@ def get_current_node_accelerator_memory() -> int: Return 0 if the current node doesn't contain accelerators of this family. """ return 0 + + @staticmethod + def get_ec2_instance_accelerator_memory( + instance_type: str, instances: dict + ) -> Optional[str]: + """Get the accelerator total memory of this family on the current node. + + Args: + instance_type: The ec2 instance type. + instances: Map from ec2 instance type to instance metadata returned by + ec2 `describe-instance-types`. + + Returns: + The accelerator total memory of this family in bytes on the ec2 instance + with given type. + Return None if it's unknown. + """ + return None diff --git a/python/ray/_private/accelerators/nvidia_gpu.py b/python/ray/_private/accelerators/nvidia_gpu.py index b2dbbc3e30751..1f74d864465ca 100644 --- a/python/ray/_private/accelerators/nvidia_gpu.py +++ b/python/ray/_private/accelerators/nvidia_gpu.py @@ -136,3 +136,17 @@ def get_current_node_accelerator_memory() -> int: cuda_device_memory = int(pynvml.nvmlDeviceGetMemoryInfo(handle).total) pynvml.nvmlShutdown() return cuda_device_memory + + @staticmethod + def get_ec2_instance_accelerator_memory( + instance_type: str, instances: dict + ) -> Optional[str]: + if instance_type not in instances: + return None + + gpus = instances[instance_type].get("GpuInfo", {}).get("Gpus") + if gpus is not None: + # TODO(ameer): currently we support one gpu type per node. + assert len(gpus) == 1 + return int(gpus[0]["MemoryInfo"]["SizeInMiB"]) * 1024 * 1024 + return None diff --git a/python/ray/autoscaler/_private/aws/node_provider.py b/python/ray/autoscaler/_private/aws/node_provider.py index cd1e8676bd87d..11511ed86446a 100644 --- a/python/ray/autoscaler/_private/aws/node_provider.py +++ b/python/ray/autoscaler/_private/aws/node_provider.py @@ -662,6 +662,19 @@ def fillout_available_node_types_resources( autodetected_resources[ f"accelerator_type:{accelerator_type}" ] = 1 + # autodetect gpu memory + gpu_memory = ( + accelerator_manager.get_ec2_instance_accelerator_memory( + instance_type, instances_dict + ) + ) + if ( + accelerator_manager.get_resource_name() == "GPU" + and gpu_memory + ): + autodetected_resources["gpu_memory"] = ( + num_accelerators * gpu_memory + ) autodetected_resources.update( available_node_types[node_type].get("resources", {}) diff --git a/python/ray/autoscaler/_private/resource_demand_scheduler.py b/python/ray/autoscaler/_private/resource_demand_scheduler.py index 383af9d6002c2..57bb73488ca60 100644 --- a/python/ray/autoscaler/_private/resource_demand_scheduler.py +++ b/python/ray/autoscaler/_private/resource_demand_scheduler.py @@ -546,11 +546,14 @@ def add_node(node_type, available_resources=None): node_type = tags[TAG_RAY_USER_NODE_TYPE] ip = self.provider.internal_ip(node_id) available_resources = copy.deepcopy(unused_resources_by_ip.get(ip)) - available = self.node_types[node_type]["resources"] - if available_resources and "node:gpu_memory_per_gpu" in available: - available_resources["node:gpu_memory_per_gpu"] = available[ + available_node_type = self.node_types.get(node_type, {}) + if ( + available_resources + and "node:gpu_memory_per_gpu" in available_node_type["resources"] + ): + available_resources[ "node:gpu_memory_per_gpu" - ] + ] = available_node_type["resources"]["node:gpu_memory_per_gpu"] add_node(node_type, available_resources) for node_type, count in pending_nodes.items(): diff --git a/python/ray/tests/test_autoscaler_yaml.py b/python/ray/tests/test_autoscaler_yaml.py index b74383f8b3ab0..ede6f967e9d71 100644 --- a/python/ray/tests/test_autoscaler_yaml.py +++ b/python/ray/tests/test_autoscaler_yaml.py @@ -169,6 +169,7 @@ def testValidateDefaultConfigAWSMultiNodeTypes(self): "CPU": 32, "memory": 183395103539, "GPU": 4, + "gpu_memory": 4 * 16160 * 1024 * 1024, "accelerator_type:V100": 1, } expected_available_node_types["neuron_core_inf_1_ondemand"]["resources"] = { @@ -197,7 +198,15 @@ def testValidateDefaultConfigAWSMultiNodeTypes(self): "InstanceType": "p3.8xlarge", "VCpuInfo": {"DefaultVCpus": 32}, "MemoryInfo": {"SizeInMiB": 249856}, - "GpuInfo": {"Gpus": [{"Name": "V100", "Count": 4}]}, + "GpuInfo": { + "Gpus": [ + { + "Name": "V100", + "Count": 4, + "MemoryInfo": {"SizeInMiB": 16160}, + } + ] + }, }, { "InstanceType": "inf2.xlarge", @@ -221,7 +230,6 @@ def testValidateDefaultConfigAWSMultiNodeTypes(self): new_config = prepare_config(new_config) importer = _NODE_PROVIDERS.get(new_config["provider"]["type"]) provider_cls = importer(new_config["provider"]) - try: new_config = provider_cls.fillout_available_node_types_resources( new_config From 6b6ac323dddd22735ddb712c2b11bdcf2fba5c5d Mon Sep 17 00:00:00 2001 From: Jonathan Nitisastro Date: Wed, 6 Dec 2023 17:53:14 -0800 Subject: [PATCH 14/14] update ray start with gpu_memory as per gpu instead of total Signed-off-by: Jonathan Nitisastro --- python/ray/_private/resource_spec.py | 9 ++++----- python/ray/scripts/scripts.py | 6 +++--- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/python/ray/_private/resource_spec.py b/python/ray/_private/resource_spec.py index 9d8457fa5b916..f908f076c3240 100644 --- a/python/ray/_private/resource_spec.py +++ b/python/ray/_private/resource_spec.py @@ -216,11 +216,10 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None): if accelerator_resource_name == "GPU": num_gpus = num_accelerators gpu_memory = ( - self.gpu_memory - if self.gpu_memory - else ( - num_accelerators - * accelerator_manager.get_current_node_accelerator_memory() + num_accelerators * ( + self.gpu_memory if self.gpu_memory + else + accelerator_manager.get_current_node_accelerator_memory() ) ) else: diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 4673db2433ae3..da2cbd909f228 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -385,9 +385,9 @@ def debug(address): "--gpu-memory", required=False, type=int, - help="The total amount of memory (in bytes) to make available to workers. " - "By default, this is set to the sum of available memory " - "from the gpus detected gpus in the node.", + help="The amount of GPU memory per GPU (in bytes) to make available to workers. " + "By default, this is set to the available memory " + "from the detected gpus in the node.", ) @click.option( "--resources",