diff --git a/python/ray/_private/accelerators/accelerator.py b/python/ray/_private/accelerators/accelerator.py index 70178094e14cd..7a7e12bd58a8c 100644 --- a/python/ray/_private/accelerators/accelerator.py +++ b/python/ray/_private/accelerators/accelerator.py @@ -136,3 +136,31 @@ def get_ec2_instance_accelerator_type( Return None if it's unknown. """ return None + + @staticmethod + def get_current_node_accelerator_memory() -> int: + """Get the total number of accelerators of this family on the current node. + + Returns: + The detected total number of accelerators of this family. + Return 0 if the current node doesn't contain accelerators of this family. + """ + return 0 + + @staticmethod + def get_ec2_instance_accelerator_memory( + instance_type: str, instances: dict + ) -> Optional[str]: + """Get the accelerator total memory of this family on the current node. + + Args: + instance_type: The ec2 instance type. + instances: Map from ec2 instance type to instance metadata returned by + ec2 `describe-instance-types`. + + Returns: + The accelerator total memory of this family in bytes on the ec2 instance + with given type. + Return None if it's unknown. + """ + return None diff --git a/python/ray/_private/accelerators/intel_gpu.py b/python/ray/_private/accelerators/intel_gpu.py index bd6f1c0fcbb14..c281c06a9ec07 100644 --- a/python/ray/_private/accelerators/intel_gpu.py +++ b/python/ray/_private/accelerators/intel_gpu.py @@ -101,3 +101,7 @@ def set_current_process_visible_accelerator_ids( os.environ[ IntelGPUAcceleratorManager.get_visible_accelerator_ids_env_var() ] = prefix + ",".join([str(i) for i in visible_xpu_devices]) + + @staticmethod + def get_current_node_accelerator_memory() -> int: + return 0 diff --git a/python/ray/_private/accelerators/nvidia_gpu.py b/python/ray/_private/accelerators/nvidia_gpu.py index 9d776221b9e2a..1f74d864465ca 100644 --- a/python/ray/_private/accelerators/nvidia_gpu.py +++ b/python/ray/_private/accelerators/nvidia_gpu.py @@ -122,3 +122,31 @@ def get_ec2_instance_accelerator_type( assert len(gpus) == 1 return gpus[0]["Name"] return None + + @staticmethod + def get_current_node_accelerator_memory() -> int: + try: + pynvml.nvmlInit() + except pynvml.NVMLError: + return 0 # pynvml init failed + device_count = pynvml.nvmlDeviceGetCount() + cuda_device_memory = 0 + if device_count > 0: + handle = pynvml.nvmlDeviceGetHandleByIndex(0) + cuda_device_memory = int(pynvml.nvmlDeviceGetMemoryInfo(handle).total) + pynvml.nvmlShutdown() + return cuda_device_memory + + @staticmethod + def get_ec2_instance_accelerator_memory( + instance_type: str, instances: dict + ) -> Optional[str]: + if instance_type not in instances: + return None + + gpus = instances[instance_type].get("GpuInfo", {}).get("Gpus") + if gpus is not None: + # TODO(ameer): currently we support one gpu type per node. + assert len(gpus) == 1 + return int(gpus[0]["MemoryInfo"]["SizeInMiB"]) * 1024 * 1024 + return None diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index a40d61f4c5ac1..8141636476f85 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -507,6 +507,7 @@ def merge_resources(env_dict, params_dict): num_cpus = env_dict.pop("CPU", None) num_gpus = env_dict.pop("GPU", None) memory = env_dict.pop("memory", None) + gpu_memory = env_dict.pop("gpu_memory", None) object_store_memory = env_dict.pop("object_store_memory", None) result = params_dict.copy() @@ -518,7 +519,7 @@ def merge_resources(env_dict, params_dict): "Autoscaler is overriding your resource:" f"{key}: {params_dict[key]} with {env_dict[key]}." ) - return num_cpus, num_gpus, memory, object_store_memory, result + return num_cpus, num_gpus, memory, gpu_memory, object_store_memory, result if not self._resource_spec: env_resources = {} @@ -534,6 +535,7 @@ def merge_resources(env_dict, params_dict): num_cpus, num_gpus, memory, + gpu_memory, object_store_memory, resources, ) = merge_resources(env_resources, self._ray_params.resources) @@ -541,6 +543,7 @@ def merge_resources(env_dict, params_dict): self._ray_params.num_cpus if num_cpus is None else num_cpus, self._ray_params.num_gpus if num_gpus is None else num_gpus, self._ray_params.memory if memory is None else memory, + self._ray_params._gpu_memory if gpu_memory is None else gpu_memory, self._ray_params.object_store_memory if object_store_memory is None else object_store_memory, diff --git a/python/ray/_private/parameter.py b/python/ray/_private/parameter.py index 1770dd2fa00d9..595344878a0e5 100644 --- a/python/ray/_private/parameter.py +++ b/python/ray/_private/parameter.py @@ -33,6 +33,7 @@ class RayParams: of that resource available. labels: The key-value labels of the node. memory: Total available memory for workers requesting memory. + _gpu_memory: Total available memory from all GPUs in MB. object_store_memory: The amount of memory (in bytes) to start the object store with. redis_max_memory: The max amount of memory (in bytes) to allow redis @@ -141,6 +142,7 @@ def __init__( resources: Optional[Dict[str, float]] = None, labels: Optional[Dict[str, str]] = None, memory: Optional[float] = None, + _gpu_memory: Optional[float] = None, object_store_memory: Optional[float] = None, redis_max_memory: Optional[float] = None, redis_port: Optional[int] = None, @@ -198,6 +200,7 @@ def __init__( self.num_cpus = num_cpus self.num_gpus = num_gpus self.memory = memory + self._gpu_memory = _gpu_memory self.object_store_memory = object_store_memory self.resources = resources self.redis_max_memory = redis_max_memory @@ -439,6 +442,9 @@ def build_error(resource, alternative): assert "CPU" not in self.resources, build_error("CPU", "num_cpus") assert "GPU" not in self.resources, build_error("GPU", "num_gpus") assert "memory" not in self.resources, build_error("memory", "memory") + assert "gpu_memory" not in self.resources, build_error( + "gpu_memory", "_gpu_memory" + ) assert "object_store_memory" not in self.resources, build_error( "object_store_memory", "object_store_memory" ) diff --git a/python/ray/_private/ray_option_utils.py b/python/ray/_private/ray_option_utils.py index 56d998ace406d..e5b0a7672afdc 100644 --- a/python/ray/_private/ray_option_utils.py +++ b/python/ray/_private/ray_option_utils.py @@ -125,6 +125,7 @@ def _validate_resources(resources: Optional[Dict[str, float]]) -> Optional[str]: "name": Option((str, type(None))), "num_cpus": _resource_option("num_cpus"), "num_gpus": _resource_option("num_gpus"), + "_gpu_memory": _resource_option("_gpu_memory"), "object_store_memory": _counting_option("object_store_memory", False), # TODO(suquark): "placement_group", "placement_group_bundle_index" # and "placement_group_capture_child_tasks" are deprecated, diff --git a/python/ray/_private/resource_spec.py b/python/ray/_private/resource_spec.py index 8b7d3728b9f62..f908f076c3240 100644 --- a/python/ray/_private/resource_spec.py +++ b/python/ray/_private/resource_spec.py @@ -23,6 +23,7 @@ class ResourceSpec( "num_cpus", "num_gpus", "memory", + "gpu_memory", "object_store_memory", "resources", "redis_max_memory", @@ -39,6 +40,7 @@ class ResourceSpec( num_cpus: The CPUs allocated for this raylet. num_gpus: The GPUs allocated for this raylet. memory: The memory allocated for this raylet. + gpu_memory: The total GPUs' memory allocated for this raylet. object_store_memory: The object store memory allocated for this raylet. Note that when calling to_resource_dict(), this will be scaled down by 30% to account for the global plasma LRU reserve. @@ -55,6 +57,7 @@ def __new__( num_cpus=None, num_gpus=None, memory=None, + gpu_memory=None, object_store_memory=None, resources=None, redis_max_memory=None, @@ -64,6 +67,7 @@ def __new__( num_cpus, num_gpus, memory, + gpu_memory, object_store_memory, resources, redis_max_memory, @@ -89,6 +93,7 @@ def to_resource_dict(self): CPU=self.num_cpus, GPU=self.num_gpus, memory=int(self.memory), + gpu_memory=int(self.gpu_memory), object_store_memory=int(self.object_store_memory), ) @@ -141,6 +146,7 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None): assert "CPU" not in resources, resources assert "GPU" not in resources, resources assert "memory" not in resources, resources + assert "gpu_memory" not in resources, resources assert "object_store_memory" not in resources, resources if node_ip_address is None: @@ -165,6 +171,7 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None): num_cpus = ray._private.utils.get_num_cpus() num_gpus = 0 + gpu_memory = 0 for ( accelerator_resource_name ) in ray._private.accelerators.get_all_accelerator_resource_names(): @@ -208,6 +215,13 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None): if num_accelerators: if accelerator_resource_name == "GPU": num_gpus = num_accelerators + gpu_memory = ( + num_accelerators * ( + self.gpu_memory if self.gpu_memory + else + accelerator_manager.get_current_node_accelerator_memory() + ) + ) else: resources[accelerator_resource_name] = num_accelerators @@ -298,6 +312,7 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None): num_cpus, num_gpus, memory, + gpu_memory, object_store_memory, resources, redis_max_memory, diff --git a/python/ray/_private/utils.py b/python/ray/_private/utils.py index 72e205be84ec9..41551d79dd0b7 100644 --- a/python/ray/_private/utils.py +++ b/python/ray/_private/utils.py @@ -361,15 +361,20 @@ def resources_from_ray_options(options_dict: Dict[str, Any]) -> Dict[str, Any]: raise ValueError( "The resources dictionary must not contain the key 'CPU' or 'GPU'" ) - elif "memory" in resources or "object_store_memory" in resources: + elif ( + "memory" in resources + or "object_store_memory" in resources + or "_gpu_memory" in resources + ): raise ValueError( - "The resources dictionary must not " - "contain the key 'memory' or 'object_store_memory'" + "The resources dictionary must not contain the key" + " 'memory' or 'object_store_memory' or '_gpu_memory'" ) num_cpus = options_dict.get("num_cpus") num_gpus = options_dict.get("num_gpus") memory = options_dict.get("memory") + gpu_memory = options_dict.get("_gpu_memory") object_store_memory = options_dict.get("object_store_memory") accelerator_type = options_dict.get("accelerator_type") @@ -381,6 +386,8 @@ def resources_from_ray_options(options_dict: Dict[str, Any]) -> Dict[str, Any]: resources["memory"] = int(memory) if object_store_memory is not None: resources["object_store_memory"] = object_store_memory + if gpu_memory is not None: + resources["gpu_memory"] = int(gpu_memory) if accelerator_type is not None: resources[ f"{ray_constants.RESOURCE_CONSTRAINT_PREFIX}{accelerator_type}" diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index f2dff8ba6351e..295b6f799ecc8 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -1157,6 +1157,7 @@ def init( resources: Optional[Dict[str, float]] = None, labels: Optional[Dict[str, str]] = None, object_store_memory: Optional[int] = None, + _gpu_memory: Optional[int] = None, local_mode: bool = False, ignore_reinit_error: bool = False, include_dashboard: Optional[bool] = None, @@ -1287,6 +1288,8 @@ def init( _driver_object_store_memory: Deprecated. _memory: Amount of reservable memory resource in bytes rounded down to the nearest integer. + _gpu_memory: The gpu memory request in bytes for this task/actor + from a single gpu, rounded up to the nearest integer. _redis_password: Prevents external clients without the password from connecting to Redis if provided. _temp_dir: If provided, specifies the root temporary @@ -1562,6 +1565,7 @@ def sigterm_handler(signum, frame): dashboard_host=dashboard_host, dashboard_port=dashboard_port, memory=_memory, + _gpu_memory=_gpu_memory, object_store_memory=object_store_memory, redis_max_memory=_redis_max_memory, plasma_store_socket_name=None, @@ -1590,6 +1594,11 @@ def sigterm_handler(signum, frame): "When connecting to an existing cluster, num_cpus " "and num_gpus must not be provided." ) + if _gpu_memory is not None: + raise ValueError( + "When connecting to an existing cluster, " + "_gpu_memory must not be provided." + ) if resources is not None: raise ValueError( "When connecting to an existing cluster, " @@ -3127,6 +3136,7 @@ def remote( resources: Dict[str, float] = Undefined, accelerator_type: str = Undefined, memory: Union[int, float] = Undefined, + _gpu_memory: Union[int, float] = Undefined, max_calls: int = Undefined, max_restarts: int = Undefined, max_task_retries: int = Undefined, @@ -3296,6 +3306,8 @@ def method(self): See :ref:`accelerator types `. memory: The heap memory request in bytes for this task/actor, rounded down to the nearest integer. + _gpu_memory: The gpu memory request in bytes for this task/actor + from a single gpu, rounded up to the nearest integer. max_calls: Only for *remote functions*. This specifies the maximum number of times that a given worker can execute the given remote function before it must exit diff --git a/python/ray/actor.py b/python/ray/actor.py index a6adfd5e862a7..dd40daf5a59c7 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -350,6 +350,7 @@ class _ActorClassMetadata: num_gpus: The default number of GPUs required by the actor creation task. memory: The heap memory quota for this actor. + _gpu_memory: The gpu memory request in bytes for this actor. resources: The default resources required by the actor creation task. accelerator_type: The specified type of accelerator required for the node on which this actor runs. @@ -376,6 +377,7 @@ def __init__( num_cpus, num_gpus, memory, + _gpu_memory, object_store_memory, resources, accelerator_type, @@ -394,6 +396,7 @@ def __init__( self.num_cpus = num_cpus self.num_gpus = num_gpus self.memory = memory + self._gpu_memory = _gpu_memory self.object_store_memory = object_store_memory self.resources = resources self.accelerator_type = accelerator_type @@ -595,6 +598,8 @@ def options(self, **actor_options): See :ref:`accelerator types `. memory: The heap memory request in bytes for this task/actor, rounded down to the nearest integer. + _gpu_memory: The gpu memory request in bytes for this task/actor + from a single gpu, rounded up to the nearest integer. object_store_memory: The object store memory request for actors only. max_restarts: This specifies the maximum number of times that the actor should be restarted when it dies @@ -718,6 +723,7 @@ def _remote(self, args=None, kwargs=None, **actor_options): num_cpus: The number of CPUs required by the actor creation task. num_gpus: The number of GPUs required by the actor creation task. memory: Restrict the heap memory usage of this actor. + _gpu_memory: Restrict the gpu memory usage of this actor. resources: The custom resources required by the actor creation task. max_concurrency: The max number of concurrent calls to allow for @@ -1424,6 +1430,12 @@ def _make_actor(cls, actor_options): Class = _modify_class(cls) _inject_tracing_into_class(Class) + if actor_options.get("_gpu_memory", None) and actor_options.get("num_gpus", None): + raise ValueError( + "Specifying both `num_gpus` and `_gpu_memory` is not allowed. " + "See more at: (link TBD)" + ) + if "max_restarts" in actor_options: if actor_options["max_restarts"] != -1: # -1 represents infinite restart # Make sure we don't pass too big of an int to C++, causing diff --git a/python/ray/autoscaler/_private/aws/node_provider.py b/python/ray/autoscaler/_private/aws/node_provider.py index cd1e8676bd87d..11511ed86446a 100644 --- a/python/ray/autoscaler/_private/aws/node_provider.py +++ b/python/ray/autoscaler/_private/aws/node_provider.py @@ -662,6 +662,19 @@ def fillout_available_node_types_resources( autodetected_resources[ f"accelerator_type:{accelerator_type}" ] = 1 + # autodetect gpu memory + gpu_memory = ( + accelerator_manager.get_ec2_instance_accelerator_memory( + instance_type, instances_dict + ) + ) + if ( + accelerator_manager.get_resource_name() == "GPU" + and gpu_memory + ): + autodetected_resources["gpu_memory"] = ( + num_accelerators * gpu_memory + ) autodetected_resources.update( available_node_types[node_type].get("resources", {}) diff --git a/python/ray/autoscaler/_private/fake_multi_node/node_provider.py b/python/ray/autoscaler/_private/fake_multi_node/node_provider.py index 29fdc3d0490fa..076efa4bf1671 100644 --- a/python/ray/autoscaler/_private/fake_multi_node/node_provider.py +++ b/python/ray/autoscaler/_private/fake_multi_node/node_provider.py @@ -316,6 +316,7 @@ def create_node_with_resources_and_labels( num_cpus=resources.pop("CPU", 0), num_gpus=resources.pop("GPU", 0), object_store_memory=resources.pop("object_store_memory", None), + _gpu_memory=resources.pop("gpu_memory", 0), # gpu memory = 600, resources=resources, labels=labels, redis_address="{}:6379".format( diff --git a/python/ray/autoscaler/_private/resource_demand_scheduler.py b/python/ray/autoscaler/_private/resource_demand_scheduler.py index 1db20be69c622..57bb73488ca60 100644 --- a/python/ray/autoscaler/_private/resource_demand_scheduler.py +++ b/python/ray/autoscaler/_private/resource_demand_scheduler.py @@ -109,7 +109,7 @@ def __init__( upscaling_speed: float, ) -> None: self.provider = provider - self.node_types = copy.deepcopy(node_types) + self.node_types = self._adjust_node_types(copy.deepcopy(node_types)) self.node_resource_updated = set() self.max_workers = max_workers self.head_node_type = head_node_type @@ -151,12 +151,27 @@ def reset_config( inferered resources are not lost. """ self.provider = provider - self.node_types = copy.deepcopy(node_types) + self.node_types = self._adjust_node_types(copy.deepcopy(node_types)) self.node_resource_updated = set() self.max_workers = max_workers self.head_node_type = head_node_type self.upscaling_speed = upscaling_speed + def _adjust_node_types(self, node_types): + # update available_node_types gpu_memory to gpu_memory_per_gpu + for node_type, node_config in node_types.items(): + resources = node_config["resources"] + if "gpu_memory" in resources: + if "GPU" in resources and resources["GPU"] > 0: + resources["node:gpu_memory_per_gpu"] = ( + resources["gpu_memory"] / resources["GPU"] + ) + else: + resources["node:gpu_memory_per_gpu"] = 0 + del resources["gpu_memory"] + node_types[node_type] = node_config + return node_types + def is_feasible(self, bundle: ResourceDict) -> bool: for node_type, config in self.node_types.items(): max_of_type = config.get("max_workers", 0) @@ -372,6 +387,10 @@ def _update_node_resources_from_runtime( for key in ["CPU", "GPU", "memory", "object_store_memory"]: if key in runtime_resources: resources[key] = runtime_resources[key] + if "gpu_memory" in runtime_resources and "GPU" in runtime_resources: + resources["node:gpu_memory_per_gpu"] = int( + runtime_resources["gpu_memory"] + ) / int(runtime_resources["GPU"]) self.node_types[node_type]["resources"] = resources node_kind = tags[TAG_RAY_NODE_KIND] @@ -526,7 +545,15 @@ def add_node(node_type, available_resources=None): if TAG_RAY_USER_NODE_TYPE in tags: node_type = tags[TAG_RAY_USER_NODE_TYPE] ip = self.provider.internal_ip(node_id) - available_resources = unused_resources_by_ip.get(ip) + available_resources = copy.deepcopy(unused_resources_by_ip.get(ip)) + available_node_type = self.node_types.get(node_type, {}) + if ( + available_resources + and "node:gpu_memory_per_gpu" in available_node_type["resources"] + ): + available_resources[ + "node:gpu_memory_per_gpu" + ] = available_node_type["resources"]["node:gpu_memory_per_gpu"] add_node(node_type, available_resources) for node_type, count in pending_nodes.items(): @@ -823,7 +850,7 @@ def _resource_based_utilization_scorer( num_matching_resource_types = 0 for k, v in node_resources.items(): # Don't divide by zero. - if v < 1: + if v < 1 or k == "node::gpu_memory_per_gpu": # Could test v == 0 on the nose, but v < 1 feels safer. # (Note that node resources are integers.) continue @@ -931,8 +958,31 @@ def get_bin_pack_residual( return unfulfilled, nodes + used +def _convert_relative_resources( + node: ResourceDict, resources: ResourceDict +) -> Optional[ResourceDict]: + # return None if relative resources can't be converted + adjusted_resources = resources.copy() + if "gpu_memory" in resources: + if ( + "node:gpu_memory_per_gpu" not in node + or node["node:gpu_memory_per_gpu"] == 0 + ): + return None + adjusted_resources["GPU"] = ( + resources["gpu_memory"] / node["node:gpu_memory_per_gpu"] + ) + if adjusted_resources["GPU"] > 1.0: + return None + del adjusted_resources["gpu_memory"] + return adjusted_resources + + def _fits(node: ResourceDict, resources: ResourceDict) -> bool: - for k, v in resources.items(): + adjusted_resources = _convert_relative_resources(node, resources) + if adjusted_resources is None: + return False + for k, v in adjusted_resources.items(): # TODO(jjyao): Change ResourceDict to a class so we can # hide the implicit resource handling. if v > node.get( @@ -943,7 +993,10 @@ def _fits(node: ResourceDict, resources: ResourceDict) -> bool: def _inplace_subtract(node: ResourceDict, resources: ResourceDict) -> None: - for k, v in resources.items(): + adjusted_resources = _convert_relative_resources(node, resources) + if adjusted_resources is None: + return + for k, v in adjusted_resources.items(): if v == 0: # This is an edge case since some reasonable programs/computers can # do `ray.autoscaler.sdk.request_resources({"GPU": 0}"})`. diff --git a/python/ray/cluster_utils.py b/python/ray/cluster_utils.py index eaf446f3620c2..848a4d65c4a3e 100644 --- a/python/ray/cluster_utils.py +++ b/python/ray/cluster_utils.py @@ -85,6 +85,8 @@ def start(self, _system_config=None, override_env: Optional[Dict] = None): self._head_resources.pop("object_store_memory") ) ) + if "gpu_memory" in self._head_resources: + cmd.append("--gpu-memory={}".format(self._head_resources.pop("gpu_memory"))) if self._head_resources: cmd.append("--resources='{}'".format(json.dumps(self._head_resources))) if _system_config is not None: diff --git a/python/ray/remote_function.py b/python/ray/remote_function.py index 77212413ad150..7a0367eb1366b 100644 --- a/python/ray/remote_function.py +++ b/python/ray/remote_function.py @@ -56,6 +56,8 @@ class RemoteFunction: remote function. _memory: The heap memory request in bytes for this task/actor, rounded down to the nearest integer. + _gpu_memory: The gpu memory request in bytes for this task/actor + from a single gpu, rounded up to the nearest integer. _resources: The default custom resource requirements for invocations of this remote function. _num_returns: The default number of return values for invocations @@ -110,6 +112,13 @@ def __init__( ) or "nsight" in (self._default_options.get("runtime_env") or {}): self._default_options["max_calls"] = 1 + # Either num_gpus or gpu_memory can be specified, not both. + if num_gpus != 0 and self._default_options.get("_gpu_memory"): + raise ValueError( + "Specifying both `num_gpus` and `_gpu_memory` is not allowed. " + "See more at: (link TBD)" + ) + # TODO(suquark): This is a workaround for class attributes of options. # They are being used in some other places, mostly tests. Need cleanup later. # E.g., actors uses "__ray_metadata__" to collect options, we can so something @@ -177,6 +186,8 @@ def options(self, **task_options): See :ref:`accelerator types `. memory: The heap memory request in bytes for this task/actor, rounded down to the nearest integer. + _gpu_memory: The gpu memory request in bytes for this task/actor + from a single gpu, rounded up to the nearest integer. object_store_memory: The object store memory request for actors only. max_calls: This specifies the maximum number of times that a given worker can execute diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 3af977374d41f..da2cbd909f228 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -381,6 +381,14 @@ def debug(address): @click.option( "--num-gpus", required=False, type=int, help="the number of GPUs on this node" ) +@click.option( + "--gpu-memory", + required=False, + type=int, + help="The amount of GPU memory per GPU (in bytes) to make available to workers. " + "By default, this is set to the available memory " + "from the detected gpus in the node.", +) @click.option( "--resources", required=False, @@ -564,6 +572,7 @@ def start( redis_max_memory, num_cpus, num_gpus, + gpu_memory, resources, head, include_dashboard, @@ -656,6 +665,7 @@ def start( redirect_output=redirect_output, num_cpus=num_cpus, num_gpus=num_gpus, + _gpu_memory=gpu_memory, resources=resources, labels=labels_dict, autoscaling_config=autoscaling_config, diff --git a/python/ray/tests/test_autoscaler_fake_multinode.py b/python/ray/tests/test_autoscaler_fake_multinode.py index 9f8ee37c1e0cd..de79bc98ec24b 100644 --- a/python/ray/tests/test_autoscaler_fake_multinode.py +++ b/python/ray/tests/test_autoscaler_fake_multinode.py @@ -25,6 +25,7 @@ def test_fake_autoscaler_basic_e2e(shutdown_only): "CPU": 2, "GPU": 1, "object_store_memory": 1024 * 1024 * 1024, + "gpu_memory": 1000, }, "node_config": {}, "min_workers": 0, @@ -52,6 +53,11 @@ def test_fake_autoscaler_basic_e2e(shutdown_only): def f(): print("gpu ok") + # Triggers the addition of a GPU memory node. + @ray.remote(_gpu_memory=1000) + def f2(): + print("gpu memory ok") + # Triggers the addition of a CPU node. @ray.remote(num_cpus=3) def g(): diff --git a/python/ray/tests/test_autoscaler_yaml.py b/python/ray/tests/test_autoscaler_yaml.py index b74383f8b3ab0..ede6f967e9d71 100644 --- a/python/ray/tests/test_autoscaler_yaml.py +++ b/python/ray/tests/test_autoscaler_yaml.py @@ -169,6 +169,7 @@ def testValidateDefaultConfigAWSMultiNodeTypes(self): "CPU": 32, "memory": 183395103539, "GPU": 4, + "gpu_memory": 4 * 16160 * 1024 * 1024, "accelerator_type:V100": 1, } expected_available_node_types["neuron_core_inf_1_ondemand"]["resources"] = { @@ -197,7 +198,15 @@ def testValidateDefaultConfigAWSMultiNodeTypes(self): "InstanceType": "p3.8xlarge", "VCpuInfo": {"DefaultVCpus": 32}, "MemoryInfo": {"SizeInMiB": 249856}, - "GpuInfo": {"Gpus": [{"Name": "V100", "Count": 4}]}, + "GpuInfo": { + "Gpus": [ + { + "Name": "V100", + "Count": 4, + "MemoryInfo": {"SizeInMiB": 16160}, + } + ] + }, }, { "InstanceType": "inf2.xlarge", @@ -221,7 +230,6 @@ def testValidateDefaultConfigAWSMultiNodeTypes(self): new_config = prepare_config(new_config) importer = _NODE_PROVIDERS.get(new_config["provider"]["type"]) provider_cls = importer(new_config["provider"]) - try: new_config = provider_cls.fillout_available_node_types_resources( new_config diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 969265cc64bb9..9c44a4202102b 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -718,6 +718,12 @@ RAY_CONFIG(std::string, predefined_unit_instance_resources, "GPU") /// When set it to "neuron_cores,TPU,FPGA", we will also treat FPGA as unit_instance. RAY_CONFIG(std::string, custom_unit_instance_resources, "neuron_cores,TPU") +/// The scheduler will treat these resource as resource which can be requested +/// but not stored as NodeResources. The main reason is the resource is different +/// representation of other resource stored in NodeResources. +/// For example: gpu_memory and GPU. +RAY_CONFIG(std::string, request_only_resources, "gpu_memory") + // Maximum size of the batches when broadcasting resources to raylet. RAY_CONFIG(uint64_t, resource_broadcast_batch_size, 512) diff --git a/src/ray/common/scheduling/cluster_resource_data.cc b/src/ray/common/scheduling/cluster_resource_data.cc index 0e66316ba2808..15f8acdc5eefc 100644 --- a/src/ray/common/scheduling/cluster_resource_data.cc +++ b/src/ray/common/scheduling/cluster_resource_data.cc @@ -20,6 +20,27 @@ namespace ray { using namespace ::ray::scheduling; +NodeResources::NodeResources(const NodeResourceSet &resources) { + NodeResourceSet resources_adjusted = resources; + absl::flat_hash_map node_labels; + if (resources.Has(ResourceID::GPU_Memory())) { + // if gpu_memory is set, default GPU value is 1 + if (!resources.Has(ResourceID::GPU())) { + resources_adjusted.Set(ResourceID::GPU(), 1); + } + node_labels["_gpu_memory_per_gpu"] = + std::to_string(resources.Get(ResourceID::GPU_Memory()).Double() / + resources_adjusted.Get(ResourceID::GPU()).Double()); + resources_adjusted.Set(ResourceID::GPU_Memory(), 0); + } else { + node_labels["_gpu_memory_per_gpu"] = "0"; + } + + this->total = resources_adjusted; + this->available = resources_adjusted; + this->labels = node_labels; +} + /// Convert a map of resources to a ResourceRequest data structure. ResourceRequest ResourceMapToResourceRequest( const absl::flat_hash_map &resource_map, @@ -54,9 +75,27 @@ NodeResources ResourceMapToNodeResources( const absl::flat_hash_map &resource_map_available, const absl::flat_hash_map &node_labels) { NodeResources node_resources; - node_resources.total = NodeResourceSet(resource_map_total); - node_resources.available = NodeResourceSet(resource_map_available); - node_resources.labels = node_labels; + auto resource_map_total_copy = resource_map_total; + auto resource_map_available_copy = resource_map_available; + auto node_labels_copy = node_labels; + + if (resource_map_total.find("gpu_memory") != resource_map_total.end()) { + // if gpu_memory is set, default GPU value is 1 + if (resource_map_total.find("GPU") == resource_map_total.end()) { + resource_map_total_copy["GPU"] = 1; + } + node_labels_copy["_gpu_memory_per_gpu"] = std::to_string( + resource_map_total.at("gpu_memory") / resource_map_total_copy.at("GPU")); + resource_map_total_copy.erase("gpu_memory"); + resource_map_available_copy.erase("gpu_memory"); + } else { + node_labels_copy["_gpu_memory_per_gpu"] = "0"; + } + + node_resources.total = NodeResourceSet(resource_map_total_copy); + node_resources.available = NodeResourceSet(resource_map_available_copy); + node_resources.labels = node_labels_copy; + return node_resources; } @@ -92,17 +131,21 @@ bool NodeResources::IsAvailable(const ResourceRequest &resource_request, RAY_LOG(DEBUG) << "At pull manager capacity"; return false; } + const ResourceSet resource_request_adjusted = + this->ConvertRelativeResources(resource_request.GetResourceSet()); if (!this->normal_task_resources.IsEmpty()) { auto available_resources = this->available; available_resources -= this->normal_task_resources; - return available_resources >= resource_request.GetResourceSet(); + return available_resources >= resource_request_adjusted; } - return this->available >= resource_request.GetResourceSet(); + return this->available >= resource_request_adjusted; } bool NodeResources::IsFeasible(const ResourceRequest &resource_request) const { - return this->total >= resource_request.GetResourceSet(); + const ResourceSet resource_request_adjusted = + this->ConvertRelativeResources(resource_request.GetResourceSet()); + return this->total >= resource_request_adjusted; } bool NodeResources::operator==(const NodeResources &other) const { @@ -126,6 +169,31 @@ std::string NodeResources::DebugString() const { return buffer.str(); } +const ResourceSet NodeResources::ConvertRelativeResources( + const ResourceSet &resource) const { + ResourceSet adjusted_resource = resource; + // convert gpu_memory to GPU + if (resource.Has(ResourceID::GPU_Memory())) { + double total_gpu_memory_per_gpu = 0; + if (this->labels.find("_gpu_memory_per_gpu") != this->labels.end()) { + // TODO: raise exception if this is not true + total_gpu_memory_per_gpu = std::stod(this->labels.at("_gpu_memory_per_gpu")); + } + double num_gpus_request = 0; + if (total_gpu_memory_per_gpu > 0) { + // round up to closes kResourceUnitScaling + num_gpus_request = + (resource.Get(ResourceID::GPU_Memory()).Double() / total_gpu_memory_per_gpu) + + 1 / static_cast(2 * kResourceUnitScaling); + } else { + return resource; + } + adjusted_resource.Set(ResourceID::GPU(), num_gpus_request); + adjusted_resource.Set(ResourceID::GPU_Memory(), 0); + } + return adjusted_resource; +} + std::string NodeResources::DictString() const { return DebugString(); } bool NodeResourceInstances::operator==(const NodeResourceInstances &other) { @@ -153,4 +221,28 @@ const NodeResourceInstanceSet &NodeResourceInstances::GetTotalResourceInstances( return this->total; }; +const ResourceSet NodeResourceInstances::ConvertRelativeResources( + const ResourceSet &resource) const { + ResourceSet adjusted_resource = resource; + // convert gpu_memory to GPU + if (resource.Has(ResourceID::GPU_Memory())) { + double total_gpu_memory_per_gpu = 0; + if (this->labels.find("_gpu_memory_per_gpu") != this->labels.end()) { + total_gpu_memory_per_gpu = std::stod(this->labels.at("_gpu_memory_per_gpu")); + } + double num_gpus_request = 0; + if (total_gpu_memory_per_gpu > 0) { + // round up to closes kResourceUnitScaling + num_gpus_request = + (resource.Get(ResourceID::GPU_Memory()).Double() / total_gpu_memory_per_gpu) + + 1 / static_cast(2 * kResourceUnitScaling); + } else { + return resource; + } + adjusted_resource.Set(ResourceID::GPU(), num_gpus_request); + adjusted_resource.Set(ResourceID::GPU_Memory(), 0); + } + return adjusted_resource; +}; + } // namespace ray diff --git a/src/ray/common/scheduling/cluster_resource_data.h b/src/ray/common/scheduling/cluster_resource_data.h index 45312e012c0aa..b88417062f8c8 100644 --- a/src/ray/common/scheduling/cluster_resource_data.h +++ b/src/ray/common/scheduling/cluster_resource_data.h @@ -289,8 +289,8 @@ class TaskResourceInstances { class NodeResources { public: NodeResources() {} - NodeResources(const NodeResourceSet &resources) - : total(resources), available(resources) {} + NodeResources(const NodeResourceSet &resources); + NodeResourceSet total; NodeResourceSet available; /// Only used by light resource report. @@ -333,6 +333,9 @@ class NodeResources { std::string DebugString() const; /// Returns compact dict-like string. std::string DictString() const; + // Returns adjusted ResourceSet after converting resource relative to others. + // For example: gpu_memory => num_gpus = gpu_memory / total.gpu_memory. + const ResourceSet ConvertRelativeResources(const ResourceSet &resource) const; }; /// Total and available capacities of each resource instance. @@ -351,6 +354,10 @@ class NodeResourceInstances { bool operator==(const NodeResourceInstances &other); /// Returns human-readable string for these resources. [[nodiscard]] std::string DebugString() const; + + // Returns adjusted ResourceSet after converting resource relative to others. + // For example: gpu_memory => num_gpus = gpu_memory / total.gpu_memory. + const ResourceSet ConvertRelativeResources(const ResourceSet &resource) const; }; struct Node { diff --git a/src/ray/common/scheduling/scheduling_ids.cc b/src/ray/common/scheduling/scheduling_ids.cc index 61faab47b1d27..fcf7092e3a583 100644 --- a/src/ray/common/scheduling/scheduling_ids.cc +++ b/src/ray/common/scheduling/scheduling_ids.cc @@ -113,6 +113,25 @@ absl::flat_hash_set &ResourceID::UnitInstanceResources() { return set; } +absl::flat_hash_set &ResourceID::RequestOnlyResources() { + static absl::flat_hash_set set{[]() { + absl::flat_hash_set res; + + std::string request_only_resources = RayConfig::instance().request_only_resources(); + if (!request_only_resources.empty()) { + std::vector results; + boost::split(results, request_only_resources, boost::is_any_of(",")); + for (std::string &result : results) { + int64_t resource_id = ResourceID(result).ToInt(); + res.insert(resource_id); + } + } + + return res; + }()}; + return set; +} + } // namespace scheduling } // namespace ray diff --git a/src/ray/common/scheduling/scheduling_ids.h b/src/ray/common/scheduling/scheduling_ids.h index 42799064c9cbf..abc2e88ffe069 100644 --- a/src/ray/common/scheduling/scheduling_ids.h +++ b/src/ray/common/scheduling/scheduling_ids.h @@ -36,6 +36,7 @@ enum PredefinedResourcesEnum { CPU, MEM, GPU, + GPU_MEM, OBJECT_STORE_MEM, PredefinedResourcesEnum_MAX }; @@ -44,6 +45,7 @@ const std::string kCPU_ResourceLabel = "CPU"; const std::string kGPU_ResourceLabel = "GPU"; const std::string kObjectStoreMemory_ResourceLabel = "object_store_memory"; const std::string kMemory_ResourceLabel = "memory"; +const std::string kGPU_Memory_ResourceLabel = "gpu_memory"; const std::string kBundle_ResourceLabel = "bundle"; /// Class to map string IDs to unique integer IDs and back. @@ -149,7 +151,8 @@ inline StringIdMap &BaseSchedulingID::GetMap() { map->InsertOrDie(kCPU_ResourceLabel, CPU) .InsertOrDie(kGPU_ResourceLabel, GPU) .InsertOrDie(kObjectStoreMemory_ResourceLabel, OBJECT_STORE_MEM) - .InsertOrDie(kMemory_ResourceLabel, MEM); + .InsertOrDie(kMemory_ResourceLabel, MEM) + .InsertOrDie(kGPU_Memory_ResourceLabel, GPU_MEM); return map; }()}; return *map; @@ -175,6 +178,8 @@ class ResourceID : public BaseSchedulingID { return !IsPredefinedResource() && absl::StartsWith(Binary(), kImplicitResourcePrefix); } + bool IsRequestOnlyResource() const { return RequestOnlyResources().contains(id_); } + /// Resource ID of CPU. static ResourceID CPU() { return ResourceID(PredefinedResourcesEnum::CPU); } @@ -184,6 +189,9 @@ class ResourceID : public BaseSchedulingID { /// Resource ID of GPU. static ResourceID GPU() { return ResourceID(PredefinedResourcesEnum::GPU); } + /// Resource ID of GPU memory. + static ResourceID GPU_Memory() { return ResourceID(PredefinedResourcesEnum::GPU_MEM); } + /// Resource ID of object store memory. static ResourceID ObjectStoreMemory() { return ResourceID(PredefinedResourcesEnum::OBJECT_STORE_MEM); @@ -197,6 +205,9 @@ class ResourceID : public BaseSchedulingID { private: /// Return the IDs of all unit-instance resources. static absl::flat_hash_set &UnitInstanceResources(); + + /// Return the IDs of all request-only-instance resources. + static absl::flat_hash_set &RequestOnlyResources(); }; } // namespace scheduling diff --git a/src/ray/raylet/local_task_manager.cc b/src/ray/raylet/local_task_manager.cc index 7be0d7430fb86..31c46330089ce 100644 --- a/src/ray/raylet/local_task_manager.cc +++ b/src/ray/raylet/local_task_manager.cc @@ -431,8 +431,10 @@ bool LocalTaskManager::PoppedWorkerHandler( const auto &required_resource = task.GetTaskSpecification().GetRequiredResources().GetResourceMap(); for (auto &entry : required_resource) { + scheduling::ResourceID resource_id(entry.first); if (!cluster_resource_scheduler_->GetLocalResourceManager().ResourcesExist( - scheduling::ResourceID(entry.first))) { + resource_id) && + !resource_id.IsRequestOnlyResource()) { RAY_CHECK(task.GetTaskSpecification().PlacementGroupBundleId().first != PlacementGroupID::Nil()); RAY_LOG(DEBUG) << "The placement group: " diff --git a/src/ray/raylet/scheduling/cluster_resource_manager.cc b/src/ray/raylet/scheduling/cluster_resource_manager.cc index 9f2a007a128da..23138c0ddbff0 100644 --- a/src/ray/raylet/scheduling/cluster_resource_manager.cc +++ b/src/ray/raylet/scheduling/cluster_resource_manager.cc @@ -88,6 +88,7 @@ bool ClusterResourceManager::UpdateNode( local_view.total = node_resources.total; local_view.available = node_resources.available; + // might need label transfer here local_view.object_pulls_queued = resource_view_sync_message.object_pulls_queued(); // Update the idle duration for the node in terms of resources usage. @@ -187,8 +188,10 @@ bool ClusterResourceManager::SubtractNodeAvailableResources( } NodeResources *resources = it->second.GetMutableLocalView(); + const ResourceSet resource_request_adjusted = + resources->ConvertRelativeResources(resource_request.GetResourceSet()); - resources->available -= resource_request.GetResourceSet(); + resources->available -= resource_request_adjusted; resources->available.RemoveNegative(); // TODO(swang): We should also subtract object store memory if the task has @@ -214,7 +217,10 @@ bool ClusterResourceManager::HasSufficientResource( return false; } - return resources.available >= resource_request.GetResourceSet(); + const ResourceSet resource_request_adjusted = + resources.ConvertRelativeResources(resource_request.GetResourceSet()); + + return resources.available >= resource_request_adjusted; } bool ClusterResourceManager::AddNodeAvailableResources(scheduling::NodeID node_id, @@ -223,13 +229,15 @@ bool ClusterResourceManager::AddNodeAvailableResources(scheduling::NodeID node_i if (it == nodes_.end()) { return false; } - auto node_resources = it->second.GetMutableLocalView(); - for (auto &resource_id : resource_set.ResourceIds()) { + const ResourceSet adjusted_resources = + node_resources->ConvertRelativeResources(resource_set); + + for (auto &resource_id : adjusted_resources.ResourceIds()) { if (node_resources->total.Has(resource_id)) { auto available = node_resources->available.Get(resource_id); auto total = node_resources->total.Get(resource_id); - auto new_available = available + resource_set.Get(resource_id); + auto new_available = available + adjusted_resources.Get(resource_id); if (new_available > total) { new_available = total; } diff --git a/src/ray/raylet/scheduling/local_resource_manager.cc b/src/ray/raylet/scheduling/local_resource_manager.cc index 25eb0eef0c658..c0ce2f679d898 100644 --- a/src/ray/raylet/scheduling/local_resource_manager.cc +++ b/src/ray/raylet/scheduling/local_resource_manager.cc @@ -80,11 +80,18 @@ bool LocalResourceManager::AllocateTaskResourceInstances( const ResourceRequest &resource_request, std::shared_ptr task_allocation) { RAY_CHECK(task_allocation != nullptr); - auto allocation = - local_resources_.available.TryAllocate(resource_request.GetResourceSet()); + const ResourceSet resource_request_adjusted = + local_resources_.ConvertRelativeResources(resource_request.GetResourceSet()); + + if (resource_request.GetResourceSet().Has(ResourceID::GPU_Memory()) && + resource_request_adjusted.Get(ResourceID::GPU()) > 1) { + return false; + } + // add adjust_gpu_memory here, added to NodeInstanceResourceSet + auto allocation = local_resources_.available.TryAllocate(resource_request_adjusted); if (allocation) { *task_allocation = TaskResourceInstances(*allocation); - for (const auto &resource_id : resource_request.ResourceIds()) { + for (const auto &resource_id : resource_request_adjusted.ResourceIds()) { SetResourceNonIdle(resource_id); } return true; diff --git a/src/ray/raylet/scheduling/policy/scorer.cc b/src/ray/raylet/scheduling/policy/scorer.cc index b8c67f3d920d0..9df65e65b4926 100644 --- a/src/ray/raylet/scheduling/policy/scorer.cc +++ b/src/ray/raylet/scheduling/policy/scorer.cc @@ -35,8 +35,11 @@ double LeastResourceScorer::Score(const ResourceRequest &required_resources, } double node_score = 0.; - for (auto &resource_id : required_resources.ResourceIds()) { - const auto &request_resource = required_resources.Get(resource_id); + const ResourceSet resource_request_adjusted = + node_resources_ptr->ConvertRelativeResources(required_resources.GetResourceSet()); + + for (auto &resource_id : resource_request_adjusted.ResourceIds()) { + const auto &request_resource = resource_request_adjusted.Get(resource_id); const auto &node_available_resource = node_resources_ptr->available.Get(resource_id); auto score = Calculate(request_resource, node_available_resource); if (score < 0.) { diff --git a/src/ray/raylet/scheduling/scheduling_policy.cc b/src/ray/raylet/scheduling/scheduling_policy.cc index 334c945e28de1..eeaac800fd5b0 100644 --- a/src/ray/raylet/scheduling/scheduling_policy.cc +++ b/src/ray/raylet/scheduling/scheduling_policy.cc @@ -24,7 +24,8 @@ namespace raylet_scheduling_policy { namespace { bool IsGPURequest(const ResourceRequest &resource_request) { - return resource_request.Has(ResourceID::GPU()); + return resource_request.Has(ResourceID::GPU()) || + resource_request.Has(ResourceID::GPU_Memory()); } bool DoesNodeHaveGPUs(const NodeResources &resources) {