Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add _desired_num_gpus attribute to CudaDevicePlacementMixin #795

Merged
merged 3 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 26 additions & 9 deletions src/distilabel/llms/mixins/cuda_device_placement.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@
# limitations under the License.

import json
import logging
import os
import tempfile
from contextlib import contextmanager
from pathlib import Path
from typing import Dict, Generator, List, Literal, Union

import portalocker
from pydantic import BaseModel, Field, PrivateAttr
from pydantic import BaseModel, Field, PositiveInt, PrivateAttr

from distilabel.mixins.runtime_parameters import RuntimeParameter

_CUDA_DEVICE_PLACEMENT_MIXIN_FILE = (
Path(tempfile.gettempdir()) / "distilabel_cuda_device_placement_mixin.json"
Expand All @@ -45,13 +48,17 @@ class CudaDevicePlacementMixin(BaseModel):
`LLM`.
"""

# TODO: this should be a runtime parameter
cuda_devices: Union[List[int], Literal["auto"]] = Field(default="auto")
cuda_devices: RuntimeParameter[Union[List[int], Literal["auto"]]] = Field(
default="auto", description="A list with the ID of the CUDA devices to be used."
)

_llm_identifier: Union[str, None] = PrivateAttr(default=None)
_desired_num_gpus: PositiveInt = PrivateAttr(default=1)
_available_cuda_devices: List[int] = PrivateAttr(default_factory=list)
_can_check_cuda_devices: bool = PrivateAttr(default=False)

_logger: Union[logging.Logger, None] = PrivateAttr(...)

def load(self) -> None:
"""Assign CUDA devices to the LLM based on the device placement information provided
in `_device_llm_placement_map`."""
Expand Down Expand Up @@ -83,7 +90,7 @@ def unload(self) -> None:
placement information provided in `_device_llm_placement_map`."""
with self._device_llm_placement_map() as device_map:
if self._llm_identifier in device_map:
self._logger.debug(
self._logger.debug( # type: ignore
f"Removing '{self._llm_identifier}' from the CUDA device map file"
f" '{_CUDA_DEVICE_PLACEMENT_MIXIN_FILE}'."
)
Expand Down Expand Up @@ -124,7 +131,16 @@ def _assign_cuda_devices(self) -> None:
# Take the lock and read the device placement information for each LLM.
with self._device_llm_placement_map() as device_map:
if self.cuda_devices == "auto":
self.cuda_devices = [self._get_cuda_device(device_map)]
self.cuda_devices = []
for _ in range(self._desired_num_gpus):
if (device_id := self._get_cuda_device(device_map)) is not None:
self.cuda_devices.append(device_id)
device_map[self._llm_identifier] = self.cuda_devices # type: ignore
if len(self.cuda_devices) != self._desired_num_gpus:
self._logger.warning( # type: ignore
f"Could not assign the desired number of GPUs {self._desired_num_gpus}"
f" for LLM with identifier '{self._llm_identifier}'."
)
else:
self._check_cuda_devices(device_map)

Expand All @@ -143,17 +159,17 @@ def _check_cuda_devices(self, device_map: Dict[str, List[int]]) -> None:
Args:
device_map: a dictionary with the device placement information for each LLM.
"""
for device in self.cuda_devices:
for device in self.cuda_devices: # type: ignore
for llm, devices in device_map.items():
if device in devices:
self._logger.warning(
self._logger.warning( # type: ignore
f"LLM with identifier '{llm}' is also going to use CUDA device "
f"'{device}'. This may lead to performance issues or running out"
" of memory depending on the device capabilities and the loaded"
" models."
)

def _get_cuda_device(self, device_map: Dict[str, List[int]]) -> int:
def _get_cuda_device(self, device_map: Dict[str, List[int]]) -> Union[int, None]:
"""Returns the first available CUDA device to be used by the LLM that is not going
to be used by any other LLM.

Expand All @@ -170,6 +186,7 @@ def _get_cuda_device(self, device_map: Dict[str, List[int]]) -> int:
if all(device not in devices for devices in device_map.values()):
return device

return None
raise RuntimeError(
"Couldn't find an available CUDA device automatically to be used by the LLM"
f" '{self._llm_identifier}'. For forcing the use of a specific device, set the"
Expand All @@ -193,7 +210,7 @@ def _set_cuda_visible_devices(self) -> None:
)

cuda_devices = ",".join([str(device) for device in self.cuda_devices])
self._logger.info(
self._logger.info( # type: ignore
f"🎮 LLM '{self._llm_identifier}' is going to use the following CUDA devices:"
f" {self.cuda_devices}."
)
Expand Down
1 change: 1 addition & 0 deletions src/distilabel/pipeline/step_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def __init__(
and isinstance(self.step.llm, CudaDevicePlacementMixin)
):
self.step.llm._llm_identifier = self.step.name
self.step.llm._desired_num_gpus = self.step.resources.gpus

def run(self) -> str:
"""The target function executed by the process. This function will also handle
Expand Down
35 changes: 29 additions & 6 deletions tests/unit/llms/mixins/test_cuda_device_placement.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,19 +95,42 @@ def test_set_cuda_visible_devices_auto(self) -> None:
llm1.unload()
llm2.unload()

def test_set_cuda_visible_devices_auto_with_desired_num_gpus(self, caplog) -> None:
llm1 = DummyCudaLLM()
llm1._llm_identifier = "unit-test-1"
llm1._desired_num_gpus = 3
llm1.load()

assert os.environ["CUDA_VISIBLE_DEVICES"] == "0,1,2"

llm2 = DummyCudaLLM()
llm2._llm_identifier = "unit-test-2"
llm2._desired_num_gpus = 2
llm2.load()

assert os.environ["CUDA_VISIBLE_DEVICES"] == "3"
assert (
"Could not assign the desired number of GPUs 2 for LLM with identifier 'unit-test-2'"
in caplog.text
)

llm1.unload()
llm2.unload()

def test_set_cuda_visible_devices_auto_not_enough_devices(self) -> None:
llms = []
for i in range(5):
llm = DummyCudaLLM()
llm._llm_identifier = f"unit-test-{i}"
llms.append(llm)

with pytest.raises(
RuntimeError, match="Couldn't find an available CUDA device"
):
# 4 devices are available, but 5 LLMs are going to be loaded
for llm in llms:
llm.load()
# 4 devices are available, but 5 LLMs are going to be loaded
for i, llm in enumerate(llms):
llm.load()
if i == len(llms) - 1:
assert llm.cuda_devices == []
else:
assert llm.cuda_devices == [i]

for llm in llms:
llm.unload()
Expand Down
Loading