Skip to content

Commit

Permalink
[FEA] Provide device_resources_manager for easy generation of device_…
Browse files Browse the repository at this point in the history
…resources (#1716)

Edit: I have renamed the object to raft::device_resources_manager on the (excellent) suggestion of @cjnolet. Substitute the name accordingly in the description below.

### Summary
This PR introduces a new utility (`raft::resource_manager`) to RAFT that is used to help downstream applications correctly generate the `raft::device_resources` they need to interact with the RAFT API.

### Purpose
As more vector search applications have begun integrating RAFT, it has become apparent that correctly managing CUDA resources like streams, stream pools, and device memory can be a challenge in a codebase that has previously focused exclusively on CPU execution. As a specific example, these applications are generally highly multi-threaded on the host. As they begin to use RAFT, they typically make use of the default `device_resources` constructor to generate the requisite `device_resources` object for the API. Because this default constructor makes use of the default stream per thread, the application uses as many streams as there are host threads. This behavior can lead to exhaustion of device resources because all of those host threads simultaneously launch work on independent streams.

In a CUDA-aware codebase, we might expect the application to manage its own limited pool of streams, but requiring this creates and unnecessary barrier for RAFT adoption. Instead, the `resource_manager` provides a straightforward method for limiting streams and other CUDA resources to sensible values in a highly multi-threaded application.

### Usage
To use the `resource_manager`, the host application will make use of setters which can provide control over various device resources. For instance, to limit the total number of streams used by the application to 16 per device, the application would call the following during its startup:
```
raft::resource_manager::set_streams_per_device(16);
```
After startup, if the application wishes to make a RAFT API call using a `raft::device_resources` object, it may call the following:
```
auto res = raft::resource_manager::get_device_resources();
some_raft_call(res);
```
If the same host thread calls `get_device_resources()` again in another function, it will retrieve a `device_resources` object based on the exact same stream it got with the previous call. This is similar in spirit to the way that the default CUDA stream per thread is used, but it draws from a limited pool of streams. This also means that while each host thread is associated with one CUDA stream, that stream may be associated with multiple host threads.

In addition to drawing the `device_resources` primary stream from a limited pool, we can share a pool of stream pools among host threads:
```
// Share 4 stream pools among host threads, with the same pool always assigned to any given thread
raft::resource_manager::set_stream_pools_per_device(4);
```

Besides streams and stream pools, the resource manager optionally allows initialization of an RMM memory pool for device allocations:
```
// Start the pool with only 2048 bytes
raft::resource_manager::set_init_mem_pool_size(2048);
// Allow the pool to use all available device memory
raft::resource_manager::set_max_mem_pool_size(std::nullopt);
```
For downstream consumers who know they want a memory pool but are not sure what sizes to pick, the following convenience function just sets up the memory pool with RMM defaults:
```
raft::resource_manager::set_mem_pool();
```
If no memory pool related options are set or if the maximum memory pool size is set to 0, no memory pool will be created or used. Furthermore, if the type of the current device memory resource is non-default, no memory pool will be created or used, and a warning will be emitted. We assume that if the application has already set a non-default memory resource, this was done intentionally and should not be overwritten.

### Design
This object is designed with the following priorities:

- Ease of use
- Thread safety
- Performance
- (Lastly) Access to as many `device_resources` options as possible

If a downstream application needs complete control over `device_resources` creation and memory resource initialization, that codebase is probably already CUDA-aware and will not benefit from the resource manager. Therefore, we do not insist that every possible configuration of resources be available through the manager. Nevertheless, codebases may grow more CUDA-aware with time, so we provide access to as many options as possible (with sensible defaults) to provide an on-ramp to more sophisticated manipulation of device resources.

In terms of performance, the goal is to make retrieval of `device_resources` as fast as possible and avoid blocking other host threads. To that end, the design of `resource_manager` includes a layer of indirection that ensures that each host thread needs to acquire a lock only on its first `get_device_resources` call for any given device. The `resource_manager` singleton maintains an internal configuration object that may be updated using the setter methods until the first call to `get_device_resources` on any thread. After that, a warning will be emitted and no changes will be made on any subsequent setter calls.

Within the `get_device_resources` call, each thread keeps track of which devices it has already retrieved resources for. If it has not yet retrieved resources for that device, it acquires a lock. It then marks the configuration as finalized and checks to see if any thread has initialized the shared resources for that device. If no other thread has, it initializes those resources itself. It then updates its own thread_local list of devices it has retrieved resources for so that it does not need to reacquire the lock on subsequent calls.

### Questions
~This PR is still a WIP. It is being posted here now primarily to gather feedback on its public API. A thorough review of the implementation can happen once tests have been added and it is moved out of WIP.~ Edit: This PR is now ready for review. Implementation feedback welcome!

One question I have is about two convenience functions I added right at the end: `synchronize_work_from_this_thread` and `synchronize_work_from_all_threads`. The idea behind these functions is that for the target audience of this feature, it may be helpful to provide synchronization helpers that clearly indicate what their execution mean in relation to the more familiar synchronization requirements of the host code. The first is intended to communicate that it is guaranteed to block host execution on this thread until all work submitted to the device from that thread is completed. The hope is that this can increase confidence around questions of synchronicity that developers unfamiliar with CUDA streams sometimes have. Because this helper is not strictly required for the core functionality of `resource_manager`, however, it would be useful to have feedback on whether others think it is worthwhile.

I am even less certain about `synchronize_work_from_all_threads`. The idea behind this is that it provides a way to block host thread execution until all work submitted on streams under the `resource_manager`'s control have completed. A natural question is why we would not just perform a device synchronization at that point. My justification for that is that the application may also be integrating other libraries which have their own stream management and synchronization infrastructure. In that case, it may be desirable _not_ to synchronize work submitted to the device by calls to the other library. It would be useful to hear if others think this might be useful for the target audience of this PR or if we should just push them toward a device synchronization to avoid unpleasant edge cases.

Authors:
  - William Hicks (https://github.com/wphicks)
  - Corey J. Nolet (https://github.com/cjnolet)

Approvers:
  - Allard Hendriksen (https://github.com/ahendriksen)
  - Corey J. Nolet (https://github.com/cjnolet)

URL: #1716
  • Loading branch information
wphicks authored Aug 15, 2023
1 parent 2c85c0b commit 7aded12
Show file tree
Hide file tree
Showing 8 changed files with 929 additions and 2 deletions.
606 changes: 606 additions & 0 deletions cpp/include/raft/core/device_resources_manager.hpp

Large diffs are not rendered by default.

66 changes: 66 additions & 0 deletions cpp/include/raft/core/device_setter.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once
#include <cuda_runtime_api.h>
#include <raft/core/logger.hpp>
#include <raft/util/cuda_rt_essentials.hpp>
namespace raft {

/**
* @brief A scoped setter for the active CUDA device
*
* On construction, the device_setter will set the active CUDA device to the
* indicated value. On deletion, the active CUDA device will be set back to
* its previous value. If the call to set the new active device fails, an
* exception will be thrown. If the call to set the device back to its
* previously selected value throws, an error will be logged, but no
* exception will be thrown.
*
* @param int device_id The ID of the CUDA device to make active
*
*/
struct device_setter {
/**
* Return the id of the current device as an integer
*/
static auto get_current_device()
{
auto result = int{};
RAFT_CUDA_TRY(cudaGetDevice(&result));
return result;
}
/**
* Return the count of currently available CUDA devices
*/
static auto get_device_count()
{
auto result = int{};
RAFT_CUDA_TRY(cudaGetDeviceCount(&result));
return result;
}

explicit device_setter(int new_device) : prev_device_{get_current_device()}
{
RAFT_CUDA_TRY(cudaSetDevice(new_device));
}
~device_setter() { RAFT_CUDA_TRY_NO_THROW(cudaSetDevice(prev_device_)); }

private:
int prev_device_;
};

} // namespace raft
19 changes: 19 additions & 0 deletions cpp/include/raft/core/logger-ext.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,23 @@ class logger {
static inline std::unordered_map<std::string, std::shared_ptr<raft::logger>> log_map;
}; // class logger

/**
* @brief An object used for scoped log level setting
*
* Instances of `raft::log_level_setter` will set RAFT logging to the level
* indicated on construction and will revert to the previous set level on
* destruction.
*/
struct log_level_setter {
explicit log_level_setter(int level)
{
prev_level_ = logger::get(RAFT_NAME).get_level();
logger::get(RAFT_NAME).set_level(level);
}
~log_level_setter() { logger::get(RAFT_NAME).set_level(prev_level_); }

private:
int prev_level_;
}; // class log_level_setter

}; // namespace raft
3 changes: 1 addition & 2 deletions cpp/include/raft/util/reduction.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ DI T blockReduce(T val, char* smem, ReduceLambda reduce_op = raft::add_op{})
* @param val input value
* @param idx index to be used as rank
* @param reduce_op a binary reduction operation.
* @return only the thread0 will contain valid reduced result
*/
template <typename T, typename ReduceLambda, typename i_t = int>
DI void warpRankedReduce(T& val, i_t& idx, ReduceLambda reduce_op = raft::min_op{})
Expand Down Expand Up @@ -199,4 +198,4 @@ DI i_t binaryBlockReduce(i_t val, i_t* shmem)
}
}

} // namespace raft
} // namespace raft
2 changes: 2 additions & 0 deletions cpp/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ if(BUILD_TESTS)
NAME
CORE_TEST
PATH
test/core/device_resources_manager.cpp
test/core/device_setter.cpp
test/core/logger.cpp
test/core/math_device.cu
test/core/math_host.cpp
Expand Down
146 changes: 146 additions & 0 deletions cpp/test/core/device_resources_manager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <array>
#include <cuda_runtime_api.h>
#include <gtest/gtest.h>
#include <mutex>
#include <omp.h>
#include <raft/core/device_resources_manager.hpp>
#include <raft/core/device_setter.hpp>
#include <raft/core/logger.hpp>
#include <rmm/mr/device/cuda_memory_resource.hpp>
#include <rmm/mr/device/limiting_resource_adaptor.hpp>
#include <rmm/mr/device/per_device_resource.hpp>
#include <rmm/mr/device/pool_memory_resource.hpp>
#include <set>

namespace raft {
auto get_test_device_ids()
{
auto devices = std::array<int, 2>{int{}, int{}};
auto device_count = 0;
RAFT_CUDA_TRY(cudaGetDeviceCount(&device_count));
devices[1] = int{device_count > 1};
return devices;
}

TEST(DeviceResourcesManager, ObeysSetters)
{
auto devices = get_test_device_ids();

auto streams_per_device = 3;
auto pools_per_device = 3;
auto streams_per_pool = 7;
auto workspace_limit = 2048;
auto workspace_init = 1024;
device_resources_manager::set_streams_per_device(streams_per_device);
device_resources_manager::set_stream_pools_per_device(pools_per_device, streams_per_pool);
device_resources_manager::set_mem_pool();
device_resources_manager::set_workspace_allocation_limit(workspace_limit);

auto unique_streams = std::array<std::set<cudaStream_t>, 2>{};
auto unique_pools = std::array<std::set<rmm::cuda_stream_pool const*>, 2>{};

// Provide lock for counting unique objects
auto mtx = std::mutex{};
auto workspace_mrs =
std::array<std::shared_ptr<rmm::mr::pool_memory_resource<rmm::mr::cuda_memory_resource>>, 2>{
nullptr, nullptr};
auto alternate_workspace_mrs = std::array<std::shared_ptr<rmm::mr::cuda_memory_resource>, 2>{};
auto upstream_mrs = std::array<rmm::mr::cuda_memory_resource*, 2>{
dynamic_cast<rmm::mr::cuda_memory_resource*>(
rmm::mr::get_per_device_resource(rmm::cuda_device_id{devices[0]})),
dynamic_cast<rmm::mr::cuda_memory_resource*>(
rmm::mr::get_per_device_resource(rmm::cuda_device_id{devices[1]}))};

for (auto i = std::size_t{}; i < devices.size(); ++i) {
auto scoped_device = device_setter{devices[i]};
if (upstream_mrs[i] == nullptr) {
RAFT_LOG_WARN(
"RMM memory resource already set. Tests for device_resources_manger will be incomplete.");
} else {
workspace_mrs[i] =
std::make_shared<rmm::mr::pool_memory_resource<rmm::mr::cuda_memory_resource>>(
upstream_mrs[i], workspace_init, workspace_limit);
alternate_workspace_mrs[i] = std::make_shared<rmm::mr::cuda_memory_resource>();
}
}

device_resources_manager::set_workspace_memory_resource(workspace_mrs[0], devices[0]);
device_resources_manager::set_workspace_memory_resource(workspace_mrs[1], devices[1]);

// Suppress the many warnings from testing use of setters after initial
// get_device_resources call
auto scoped_log_level = log_level_setter{RAFT_LEVEL_ERROR};

omp_set_dynamic(0);
#pragma omp parallel for num_threads(5)
for (auto i = std::size_t{}; i < 101; ++i) {
thread_local auto prev_streams = std::array<std::optional<cudaStream_t>, 2>{};
auto device = devices[i % devices.size()];
auto const& res = device_resources_manager::get_device_resources(device);

auto primary_stream = res.get_stream().value();
prev_streams[device] = prev_streams[device].value_or(primary_stream);
// Expect to receive the same stream every time for a given thread
EXPECT_EQ(*prev_streams[device], primary_stream);

// Using RAII device setter here to avoid changing device in other tests
// that depend on a specific device to be set
auto scoped_device = device_setter{device};
auto const& res2 = device_resources_manager::get_device_resources();
// Expect device_resources to default to current device
EXPECT_EQ(primary_stream, res2.get_stream().value());

auto const& pool = res.get_stream_pool();
EXPECT_EQ(streams_per_pool, pool.get_pool_size());

auto* mr = dynamic_cast<rmm::mr::pool_memory_resource<rmm::mr::cuda_memory_resource>*>(
rmm::mr::get_current_device_resource());
auto* workspace_mr =
dynamic_cast<rmm::mr::pool_memory_resource<rmm::mr::cuda_memory_resource>*>(
dynamic_cast<rmm::mr::limiting_resource_adaptor<rmm::mr::device_memory_resource>*>(
res.get_workspace_resource())
->get_upstream());
if (upstream_mrs[i % devices.size()] != nullptr) {
// Expect that the current memory resource is a pool memory resource as requested
EXPECT_NE(mr, nullptr);
// Expect that the upstream workspace memory resource is a pool memory
// resource as requested
EXPECT_NE(workspace_mr, nullptr);
}

{
auto lock = std::unique_lock{mtx};
unique_streams[device].insert(primary_stream);
unique_pools[device].insert(&pool);
}
// Ensure that setters have no effect after get_device_resources call
device_resources_manager::set_streams_per_device(streams_per_device + 1);
device_resources_manager::set_stream_pools_per_device(pools_per_device - 1);
device_resources_manager::set_mem_pool();
device_resources_manager::set_workspace_allocation_limit(1024);
device_resources_manager::set_workspace_memory_resource(
alternate_workspace_mrs[i % devices.size()], devices[i % devices.size()]);
}

EXPECT_EQ(streams_per_device, unique_streams[devices[0]].size());
EXPECT_EQ(streams_per_device, unique_streams[devices[1]].size());
EXPECT_EQ(pools_per_device, unique_pools[devices[0]].size());
EXPECT_EQ(pools_per_device, unique_pools[devices[1]].size());
}

} // namespace raft
70 changes: 70 additions & 0 deletions cpp/test/core/device_setter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <cuda_runtime_api.h>
#include <gtest/gtest.h>
#include <raft/core/device_setter.hpp>
#include <raft/core/logger.hpp>
#include <raft/util/cuda_rt_essentials.hpp>

namespace raft {
TEST(DeviceSetter, ScopedDevice)
{
auto device_a = int{};
auto device_b = int{device_setter::get_device_count() > 1};
if (device_b == device_a) {
RAFT_LOG_WARN("Only 1 CUDA device detected. device_setter test will be trivial");
}
auto initial_device = 0;
RAFT_CUDA_TRY(cudaGetDevice(&initial_device));
auto current_device = initial_device;
{
auto scoped_device = device_setter{device_a};
// Confirm that device is currently device_a
RAFT_CUDA_TRY(cudaGetDevice(&current_device));
EXPECT_EQ(current_device, device_a);
// Confirm that get_current_device reports expected device
EXPECT_EQ(current_device, device_setter::get_current_device());
}

// Confirm that device went back to initial value once setter was out of
// scope
RAFT_CUDA_TRY(cudaGetDevice(&current_device));
EXPECT_EQ(current_device, initial_device);

{
auto scoped_device = device_setter{device_b};
// Confirm that device is currently device_b
RAFT_CUDA_TRY(cudaGetDevice(&current_device));
EXPECT_EQ(current_device, device_b);
// Confirm that get_current_device reports expected device
EXPECT_EQ(current_device, device_setter::get_current_device());
}

// Confirm that device went back to initial value once setter was out of
// scope
RAFT_CUDA_TRY(cudaGetDevice(&current_device));
EXPECT_EQ(current_device, initial_device);

{
auto scoped_device1 = device_setter{device_b};
auto scoped_device2 = device_setter{device_a};
RAFT_CUDA_TRY(cudaGetDevice(&current_device));
// Confirm that multiple setters behave as expected, with the last
// constructed taking precedence
EXPECT_EQ(current_device, device_a);
}
}
} // namespace raft
19 changes: 19 additions & 0 deletions docs/source/cpp_api/core_resources.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,25 @@ namespace *raft::core*
:project: RAFT
:members:

Device Resources Manager
------------------------

While `raft::device_resources` provides a convenient way to access
device-related resources for a sequence of RAFT calls, it is sometimes useful
to be able to limit those resources across an entire application. For
instance, in highly multi-threaded applications, it can be helpful to limit
the total number of streams rather than relying on the default stream per
thread. `raft::device_resources_manager` offers a way to access
`raft::device_resources` instances that draw from a limited pool of
underlying device resources.

``#include <raft/core/device_resources_manager.hpp>``

namespace *raft::core*

.. doxygenclass:: raft::device_resources_manager
:project: RAFT
:members:

Resource Functions
------------------
Expand Down

0 comments on commit 7aded12

Please sign in to comment.