diff --git a/cpp/include/raft/core/device_resources_manager.hpp b/cpp/include/raft/core/device_resources_manager.hpp new file mode 100644 index 0000000000..ee4b151362 --- /dev/null +++ b/cpp/include/raft/core/device_resources_manager.hpp @@ -0,0 +1,606 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +namespace raft { + +/** + * @brief A singleton used to easily generate a raft::device_resources object + * + * Many calls to RAFT functions require a `raft::device_resources` object + * to provide CUDA resources like streams and stream pools. The + * `raft::device_resources_manager` singleton provides a straightforward method to create those + * objects in a way that allows consumers of RAFT to limit total consumption of device resources + * without actively managing streams or other CUDA-specific objects. + * + * To control the resources a consuming application will use, the + * resource manager provides setters for a variety of values. For + * instance, to ensure that no more than `N` CUDA streams are used per + * device, a consumer might call + * `raft::device_resources_manager::set_streams_per_device(N)`. Note that all of these + * setters must be used prior to retrieving the first `device_resources` from + * the manager. Setters invoked after this will log a warning but have no + * effect. + * + * After calling all desired setters, consumers can simply call + * `auto res = raft::device_resources_manager::get_device_resources();` to get a valid + * device_resources object for the current device based on previously-set + * parameters. Importantly, calling `get_device_resources()` again from the same + * thread is guaranteed to return a `device_resources` object with the same + * underlying CUDA stream and (if a non-zero number of stream pools has been + * requested) stream pool. + * + * Typical usage might look something like the following: + * @code + * void initialize_application() { + * raft::device_resources_manager::set_streams_per_device(16); + * } + * + * void foo_called_from_multiple_threads() { + * auto res = raft::device_resources_manager::get_device_resources(); + * // Call RAFT function using res + * res.sync_stream() // Ensure work completes before returning + * } + * @endcode + * + * Note that all public methods of the `device_resources_manager` are thread-safe, + * but the manager is designed to minimize locking required for + * retrieving `device_resources` objects. Each thread must acquire a lock + * exactly once per device when calling `get_device_resources`. Subsequent calls + * will still be thread-safe but will not require a lock. + * + * All public methods of the `device_resources_manager` are static. Please see + * documentation of those methods for additional usage information. + * + */ +struct device_resources_manager { + device_resources_manager(device_resources_manager const&) = delete; + void operator=(device_resources_manager const&) = delete; + + private: + device_resources_manager() {} + ~device_resources_manager() + { + // Ensure that we destroy any pool memory resources before CUDA context is + // lost + per_device_components_.clear(); + } + + // Get an id used to identify this thread for the purposes of assigning + // (in round-robin fashion) the same resources to the thread on subsequent calls to + // `get_device_resources` + static auto get_thread_id() + { + static std::atomic thread_counter{}; + thread_local std::size_t id = ++thread_counter; + return id; + } + + // This struct holds the various parameters used to control + // construction of the underlying resources shared by all + // `device_resources` objects returned by `get_device_resources` + struct resource_params { + // The total number of primary streams to be used by the + // application. If no value is provided, the default stream per thread + // is used. + std::optional stream_count{std::nullopt}; + // The total number of stream pools to be used by the application + std::size_t pool_count{}; + // How many streams to assign to each pool + std::size_t pool_size{rmm::cuda_stream_pool::default_size}; + // If a memory pool is requested (max_mem_pool_size is non-zero), use + // this initial size for the pool in bytes. Must be a multiple of 256. + // If nullopt, use half of the available memory on the current + // device. + thrust::optional init_mem_pool_size{thrust::nullopt}; + // If set to any non-zero value, create a memory pool with this + // maximum size. If nullopt, use up to the entire available memory of the + // device + thrust::optional max_mem_pool_size{std::size_t{}}; + // Limit on workspace memory for the returned device_resources object + std::optional workspace_allocation_limit{std::nullopt}; + // Optional specification of separate workspace memory resources for each + // device. The integer in each pair indicates the device for this memory + // resource. + std::vector, int>> workspace_mrs{}; + + auto get_workspace_memory_resource(int device_id) {} + } params_; + + // This struct stores the underlying resources to be shared among + // `device_resources` objects returned by this manager. + struct resource_components { + // Construct all underlying resources indicated by `params` for the + // indicated device. This includes primary streams, stream pools, and + // a memory pool if requested. + resource_components(int device_id, resource_params const& params) + : device_id_{device_id}, + streams_{[¶ms, this]() { + auto scoped_device = device_setter{device_id_}; + auto result = std::unique_ptr{nullptr}; + if (params.stream_count) { + result = std::make_unique(*params.stream_count); + } + return result; + }()}, + pools_{[¶ms, this]() { + auto scoped_device = device_setter{device_id_}; + auto result = std::vector>{}; + if (params.pool_size != 0) { + for (auto i = std::size_t{}; i < params.pool_count; ++i) { + result.push_back(std::make_shared(params.pool_size)); + } + } else if (params.pool_count != 0) { + RAFT_LOG_WARN("Stream pools of size 0 requested; no pools will be created"); + } + return result; + }()}, + pool_mr_{[¶ms, this]() { + auto scoped_device = device_setter{device_id_}; + auto result = + std::shared_ptr>{nullptr}; + // If max_mem_pool_size is nullopt or non-zero, create a pool memory + // resource + if (params.max_mem_pool_size.value_or(1) != 0) { + auto* upstream = + dynamic_cast(rmm::mr::get_current_device_resource()); + if (upstream != nullptr) { + result = + std::make_shared>( + upstream, params.init_mem_pool_size, params.max_mem_pool_size); + rmm::mr::set_current_device_resource(result.get()); + } else { + RAFT_LOG_WARN( + "Pool allocation requested, but other memory resource has already been set and " + "will not be overwritten"); + } + } + return result; + }()}, + workspace_mr_{[¶ms, this]() { + auto result = std::shared_ptr{nullptr}; + auto iter = std::find_if(std::begin(params.workspace_mrs), + std::end(params.workspace_mrs), + [this](auto&& pair) { return pair.second == device_id_; }); + if (iter != std::end(params.workspace_mrs)) { result = iter->first; } + return result; + }()} + { + } + + // Get the id of the device associated with the constructed resource + // components + [[nodiscard]] auto get_device_id() const { return device_id_; } + // Get the total number of streams available for this application + [[nodiscard]] auto stream_count() const + { + auto result = std::size_t{}; + if (streams_) { result = streams_->get_pool_size(); } + return result; + } + // Get the stream assigned to this host thread. Note that the same stream + // may be used by multiple threads, but any given thread will always use + // the same stream + [[nodiscard]] auto get_stream() const + { + auto result = rmm::cuda_stream_per_thread; + if (stream_count() != 0) { result = streams_->get_stream(get_thread_id() % stream_count()); } + return result; + } + // Get the total number of stream pools available for this + // application + [[nodiscard]] auto pool_count() const { return pools_.size(); } + // Get the stream pool assigned to this host thread. Note that the same stream pool + // may be used by multiple threads, but any given thread will always use + // the same stream pool + [[nodiscard]] auto get_pool() const + { + auto result = std::shared_ptr{nullptr}; + if (pool_count() != 0) { result = pools_[get_thread_id() % pool_count()]; } + return result; + } + // Return a (possibly null) shared_ptr to the pool memory resource + // created for this device by the manager + [[nodiscard]] auto get_pool_memory_resource() const { return pool_mr_; } + // Return the RAFT workspace allocation limit that will be used by + // `device_resources` returned from this manager + [[nodiscard]] auto get_workspace_allocation_limit() const + { + return workspace_allocation_limit_; + } + // Return a (possibly null) shared_ptr to the memory resource that will + // be used for workspace allocations by `device_resources` returned from + // this manager + [[nodiscard]] auto get_workspace_memory_resource() { return workspace_mr_; } + + private: + int device_id_; + std::unique_ptr streams_; + std::vector> pools_; + std::shared_ptr> pool_mr_; + std::shared_ptr workspace_mr_; + std::optional workspace_allocation_limit_{std::nullopt}; + }; + + // Mutex used to lock access to shared data until after the first + // `get_device_resources` call in each thread + mutable std::mutex manager_mutex_{}; + // Indicates whether or not `get_device_resources` has been called by any + // host thread + bool params_finalized_{}; + // Container for underlying device resources to be re-used across host + // threads for each device + std::vector per_device_components_; + // Container for device_resources objects shared among threads. The index + // of the outer vector is the thread id of the thread requesting resources + // modulo the total number of resources managed by this object. The inner + // vector contains all resources associated with that id across devices + // in any order. + std::vector> resources_{}; + + // Return a lock for accessing shared data + [[nodiscard]] auto get_lock() const { return std::unique_lock{manager_mutex_}; } + + // Retrieve the underlying resources to be shared across the + // application for the indicated device. This method acquires a lock the + // first time it is called in each thread for a specific device to ensure that the + // underlying resources have been correctly initialized exactly once across + // all host threads. + auto const& get_device_resources_(int device_id) + { + // Each thread maintains an independent list of devices it has + // accessed. If it has not marked a device as initialized, it + // acquires a lock to initialize it exactly once. This means that each + // thread will lock once for a particular device and not proceed until + // some thread has actually generated the corresponding device + // components + thread_local auto initialized_devices = std::vector{}; + auto res_iter = decltype(std::end(resources_[0])){}; + if (std::find(std::begin(initialized_devices), std::end(initialized_devices), device_id) == + std::end(initialized_devices)) { + // Only lock if we have not previously accessed this device on this + // thread + auto lock = get_lock(); + initialized_devices.push_back(device_id); + // If we are building components, do not allow any further changes to + // resource parameters. + params_finalized_ = true; + + if (resources_.empty()) { + // We will potentially need as many device_resources objects as there are combinations of + // streams and pools on a given device. + resources_.resize(std::max(params_.stream_count.value_or(1), std::size_t{1}) * + std::max(params_.pool_count, std::size_t{1})); + } + + auto res_idx = get_thread_id() % resources_.size(); + // Check to see if we have constructed device_resources for the + // requested device at the index assigned to this thread + res_iter = std::find_if(std::begin(resources_[res_idx]), + std::end(resources_[res_idx]), + [device_id](auto&& res) { return res.get_device() == device_id; }); + + if (res_iter == std::end(resources_[res_idx])) { + // Even if we have not yet built device_resources for the current + // device, we may have already built the underlying components, since + // multiple device_resources may point to the same components. + auto component_iter = std::find_if( + std::begin(per_device_components_), + std::end(per_device_components_), + [device_id](auto&& components) { return components.get_device_id() == device_id; }); + if (component_iter == std::end(per_device_components_)) { + // Build components for this device if we have not yet done so on + // another thread + per_device_components_.emplace_back(device_id, params_); + component_iter = std::prev(std::end(per_device_components_)); + } + auto scoped_device = device_setter(device_id); + // Build the device_resources object for this thread out of shared + // components + resources_[res_idx].emplace_back(component_iter->get_stream(), + component_iter->get_pool(), + component_iter->get_workspace_memory_resource(), + component_iter->get_workspace_allocation_limit()); + res_iter = std::prev(std::end(resources_[res_idx])); + } + } else { + auto res_idx = get_thread_id() % resources_.size(); + // If we have previously accessed this device on this thread, we do not + // need to lock. We know that this thread already initialized the + // resources it requires for this device if no other thread had already done so, so we simply + // retrieve the previously-generated resources. + res_iter = std::find_if(std::begin(resources_[res_idx]), + std::end(resources_[res_idx]), + [device_id](auto&& res) { return res.get_device() == device_id; }); + } + return *res_iter; + } + + // Thread-safe setter for the number of streams + void set_streams_per_device_(std::optional num_streams) + { + auto lock = get_lock(); + if (params_finalized_) { + RAFT_LOG_WARN( + "Attempted to set device_resources_manager properties after resources have already been " + "retrieved"); + } else { + params_.stream_count = num_streams; + } + } + + // Thread-safe setter for the number and size of stream pools + void set_stream_pools_per_device_(std::size_t num_pools, std::size_t num_streams) + { + auto lock = get_lock(); + if (params_finalized_) { + RAFT_LOG_WARN( + "Attempted to set device_resources_manager properties after resources have already been " + "retrieved"); + } else { + params_.pool_count = num_pools; + params_.pool_size = num_streams; + } + } + + // Thread-safe setter for the RAFT workspace allocation limit + void set_workspace_allocation_limit_(std::size_t memory_limit) + { + auto lock = get_lock(); + if (params_finalized_) { + RAFT_LOG_WARN( + "Attempted to set device_resources_manager properties after resources have already been " + "retrieved"); + } else { + params_.workspace_allocation_limit.emplace(memory_limit); + } + } + + // Thread-safe setter for the maximum memory pool size + void set_max_mem_pool_size_(std::optional memory_limit) + { + auto lock = get_lock(); + if (params_finalized_) { + RAFT_LOG_WARN( + "Attempted to set device_resources_manager properties after resources have already been " + "retrieved"); + } else { + if (memory_limit) { + params_.max_mem_pool_size.emplace(*memory_limit); + } else { + params_.max_mem_pool_size = thrust::nullopt; + } + } + } + + // Thread-safe setter for the initial memory pool size + void set_init_mem_pool_size_(std::optional init_memory) + { + auto lock = get_lock(); + if (params_finalized_) { + RAFT_LOG_WARN( + "Attempted to set device_resources_manager properties after resources have already been " + "retrieved"); + } else { + if (init_memory) { + params_.init_mem_pool_size.emplace(*init_memory); + } else { + params_.init_mem_pool_size = thrust::nullopt; + } + } + } + + // Thread-safe setter for workspace memory resources + void set_workspace_memory_resource_(std::shared_ptr mr, + int device_id) + { + auto lock = get_lock(); + if (params_finalized_) { + RAFT_LOG_WARN( + "Attempted to set device_resources_manager properties after resources have already been " + "retrieved"); + } else { + auto iter = std::find_if(std::begin(params_.workspace_mrs), + std::end(params_.workspace_mrs), + [device_id](auto&& pair) { return pair.second == device_id; }); + if (iter != std::end(params_.workspace_mrs)) { + iter->first = mr; + } else { + params_.workspace_mrs.emplace_back(mr, device_id); + } + } + } + + // Retrieve the instance of this singleton + static auto& get_manager() + { + static auto manager = device_resources_manager{}; + return manager; + } + + public: + /** + * @brief Retrieve device_resources to be used with the RAFT API + * + * This thread-safe method ensures that a `device_resources` object with + * the same underlying stream and stream pool is returned every time it is + * called by the same host thread. This means that if `get_device_resources` is + * used to provide all `device_resources` in an application, then + * `raft::get_device_resources().sync_stream()` and (if a stream pool is used) + * raft::get_device_resources().sync_stream_pool() are guaranteed to synchronize all + * work previously submitted to the device by this host thread. + * + * If the max memory pool size set with `set_max_mem_pool_size` is non-zero, + * the first call of this method will also create a memory pool to be used + * for all RMM-based allocations on device. + * + * @param device_id int If provided, the device for which resources should + * be returned. Defaults to active CUDA device. + */ + static auto const& get_device_resources(int device_id = device_setter::get_current_device()) + { + return get_manager().get_device_resources_(device_id); + } + + /** + * @brief Set the total number of CUDA streams to be used per device + * + * If nullopt, the default stream per thread will be used + * (essentially allowing as many streams as there are host threads). + * Otherwise, all returned `device_resources` will draw their streams from this + * limited pool. + * + * Limiting the total number of streams can be desirable for a number of + * reasons, but it is most often used in consuming applications to + * prevent a large number of host threads from flooding the device with + * simultaneous requests that may exhaust device memory or other + * resources. + * + * If called after the first call to + * `raft::device_resources_manager::get_device_resources`, no change will be made, + * and a warning will be emitted. + */ + static void set_streams_per_device(std::optional num_streams) + { + get_manager().set_streams_per_device_(num_streams); + } + + /** + * @brief Set the total number and size of CUDA stream pools to be used per device + * + * Setting the number of stream pools to a non-zero value will provide a + * pool of stream pools that can be shared among host threads. This can be + * useful for the same reason it is useful to limit the total number of + * primary streams assigned to `device_resoures` for each host thread. + * Repeated calls to `get_device_resources` on a given host thread are + * guaranteed to return `device_resources` with the same underlying stream + * pool. + * + * If called after the first call to + * `raft::device_resources_manager::get_device_resources`, no change will be made, + * and a warning will be emitted. + */ + static void set_stream_pools_per_device( + std::size_t num_pools, std::size_t num_streams = rmm::cuda_stream_pool::default_size) + { + get_manager().set_stream_pools_per_device_(num_pools, num_streams); + } + /** + * @brief Set the maximum size of temporary RAFT workspaces + * + * Note that this limits only the size of temporary workspace + * allocations. To cap the device memory generally available for all device + * allocations made with RMM, use + * `raft::device_manager::set_max_mem_pool_size` + * + * If called after the first call to + * `raft::device_resources_manager::get_device_resources`, no change will be made, + * and a warning will be emitted. + */ + static void set_workspace_allocation_limit(std::size_t memory_limit) + { + get_manager().set_workspace_allocation_limit_(memory_limit); + } + + /** + * @brief Set the maximum size of the device memory pool + * + * If set to 0, no memory pool will be used. If set to nullopt, the memory + * pool is allowed to grow to the size of available device memory. + * + * Note that the pool will not actually be created until the first call + * to `raft::device_manager::get_device_resources(device_id)`, after which it will become + * the current RMM device memory resource for the indicated device. If the + * current RMM device memory resource has already been set to some + * non-default resource, no pool resource will be created and a warning will be emitted. It is + * assumed that applications which have set a memory resource already wish to manage RMM + * themselves. + * + * If called after the first call to + * `raft::device_resources_manager::get_device_resources`, no change will be made, + * and a warning will be emitted. + */ + static void set_max_mem_pool_size(std::optional max_mem) + { + get_manager().set_max_mem_pool_size_(max_mem); + } + + /** + * @brief Set the initial size of the device memory pool + * + * If set to nullopt, the memory pool starts with half of the available + * device memory. + * + * If called after the first call to + * `raft::device_resources_manager::get_device_resources`, no change will be made, + * and a warning will be emitted. + */ + static void set_init_mem_pool_size(std::optional init_mem) + { + get_manager().set_init_mem_pool_size_(init_mem); + } + /** + * @brief Request a device memory pool with specified parameters + * + * This convenience method essentially combines + * `set_init_mem_pool_size` and `set_max_mem_pool_size`. It is provided + * primarily to allow users who want a memory pool but do not want to choose + * specific pool sizes to simply call + * `raft::device_manager::set_memory_pool()` and enable a memory pool using + * RMM defaults (initialize with half of available memory, allow to grow + * to all available memory). + * + * If called after the first call to + * `raft::device_resources_manager::get_device_resources`, no change will be made, + * and a warning will be emitted. + */ + static void set_mem_pool(std::optional init_mem = std::nullopt, + std::optional max_mem = std::nullopt) + { + set_init_mem_pool_size(init_mem); + set_max_mem_pool_size(max_mem); + } + + /** + * @brief Set the workspace memory resource to be used on a specific device + * + * RAFT device_resources objects can be built with a separate memory + * resource for allocating temporary workspaces. If a (non-nullptr) memory + * resource is provided by this setter, it will be used as the + * workspace memory resource for all `device_resources` returned for the + * indicated device. + * + * If called after the first call to + * `raft::device_resources_manager::get_device_resources`, no change will be made, + * and a warning will be emitted. + */ + static void set_workspace_memory_resource(std::shared_ptr mr, + int device_id = device_setter::get_current_device()) + { + get_manager().set_workspace_memory_resource_(mr, device_id); + } +}; +} // namespace raft diff --git a/cpp/include/raft/core/device_setter.hpp b/cpp/include/raft/core/device_setter.hpp new file mode 100644 index 0000000000..42049102aa --- /dev/null +++ b/cpp/include/raft/core/device_setter.hpp @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include +#include +namespace raft { + +/** + * @brief A scoped setter for the active CUDA device + * + * On construction, the device_setter will set the active CUDA device to the + * indicated value. On deletion, the active CUDA device will be set back to + * its previous value. If the call to set the new active device fails, an + * exception will be thrown. If the call to set the device back to its + * previously selected value throws, an error will be logged, but no + * exception will be thrown. + * + * @param int device_id The ID of the CUDA device to make active + * + */ +struct device_setter { + /** + * Return the id of the current device as an integer + */ + static auto get_current_device() + { + auto result = int{}; + RAFT_CUDA_TRY(cudaGetDevice(&result)); + return result; + } + /** + * Return the count of currently available CUDA devices + */ + static auto get_device_count() + { + auto result = int{}; + RAFT_CUDA_TRY(cudaGetDeviceCount(&result)); + return result; + } + + explicit device_setter(int new_device) : prev_device_{get_current_device()} + { + RAFT_CUDA_TRY(cudaSetDevice(new_device)); + } + ~device_setter() { RAFT_CUDA_TRY_NO_THROW(cudaSetDevice(prev_device_)); } + + private: + int prev_device_; +}; + +} // namespace raft diff --git a/cpp/include/raft/core/logger-ext.hpp b/cpp/include/raft/core/logger-ext.hpp index 8fd29cf1d6..04a6a4d060 100644 --- a/cpp/include/raft/core/logger-ext.hpp +++ b/cpp/include/raft/core/logger-ext.hpp @@ -129,4 +129,23 @@ class logger { static inline std::unordered_map> log_map; }; // class logger +/** + * @brief An object used for scoped log level setting + * + * Instances of `raft::log_level_setter` will set RAFT logging to the level + * indicated on construction and will revert to the previous set level on + * destruction. + */ +struct log_level_setter { + explicit log_level_setter(int level) + { + prev_level_ = logger::get(RAFT_NAME).get_level(); + logger::get(RAFT_NAME).set_level(level); + } + ~log_level_setter() { logger::get(RAFT_NAME).set_level(prev_level_); } + + private: + int prev_level_; +}; // class log_level_setter + }; // namespace raft diff --git a/cpp/include/raft/util/reduction.cuh b/cpp/include/raft/util/reduction.cuh index 74c57b4ca2..362396f9b8 100644 --- a/cpp/include/raft/util/reduction.cuh +++ b/cpp/include/raft/util/reduction.cuh @@ -108,7 +108,6 @@ DI T blockReduce(T val, char* smem, ReduceLambda reduce_op = raft::add_op{}) * @param val input value * @param idx index to be used as rank * @param reduce_op a binary reduction operation. - * @return only the thread0 will contain valid reduced result */ template DI void warpRankedReduce(T& val, i_t& idx, ReduceLambda reduce_op = raft::min_op{}) @@ -199,4 +198,4 @@ DI i_t binaryBlockReduce(i_t val, i_t* shmem) } } -} // namespace raft \ No newline at end of file +} // namespace raft diff --git a/cpp/test/CMakeLists.txt b/cpp/test/CMakeLists.txt index efcd48cd1d..deefd9644a 100644 --- a/cpp/test/CMakeLists.txt +++ b/cpp/test/CMakeLists.txt @@ -105,6 +105,8 @@ if(BUILD_TESTS) NAME CORE_TEST PATH + test/core/device_resources_manager.cpp + test/core/device_setter.cpp test/core/logger.cpp test/core/math_device.cu test/core/math_host.cpp diff --git a/cpp/test/core/device_resources_manager.cpp b/cpp/test/core/device_resources_manager.cpp new file mode 100644 index 0000000000..11d07e3c7b --- /dev/null +++ b/cpp/test/core/device_resources_manager.cpp @@ -0,0 +1,146 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace raft { +auto get_test_device_ids() +{ + auto devices = std::array{int{}, int{}}; + auto device_count = 0; + RAFT_CUDA_TRY(cudaGetDeviceCount(&device_count)); + devices[1] = int{device_count > 1}; + return devices; +} + +TEST(DeviceResourcesManager, ObeysSetters) +{ + auto devices = get_test_device_ids(); + + auto streams_per_device = 3; + auto pools_per_device = 3; + auto streams_per_pool = 7; + auto workspace_limit = 2048; + auto workspace_init = 1024; + device_resources_manager::set_streams_per_device(streams_per_device); + device_resources_manager::set_stream_pools_per_device(pools_per_device, streams_per_pool); + device_resources_manager::set_mem_pool(); + device_resources_manager::set_workspace_allocation_limit(workspace_limit); + + auto unique_streams = std::array, 2>{}; + auto unique_pools = std::array, 2>{}; + + // Provide lock for counting unique objects + auto mtx = std::mutex{}; + auto workspace_mrs = + std::array>, 2>{ + nullptr, nullptr}; + auto alternate_workspace_mrs = std::array, 2>{}; + auto upstream_mrs = std::array{ + dynamic_cast( + rmm::mr::get_per_device_resource(rmm::cuda_device_id{devices[0]})), + dynamic_cast( + rmm::mr::get_per_device_resource(rmm::cuda_device_id{devices[1]}))}; + + for (auto i = std::size_t{}; i < devices.size(); ++i) { + auto scoped_device = device_setter{devices[i]}; + if (upstream_mrs[i] == nullptr) { + RAFT_LOG_WARN( + "RMM memory resource already set. Tests for device_resources_manger will be incomplete."); + } else { + workspace_mrs[i] = + std::make_shared>( + upstream_mrs[i], workspace_init, workspace_limit); + alternate_workspace_mrs[i] = std::make_shared(); + } + } + + device_resources_manager::set_workspace_memory_resource(workspace_mrs[0], devices[0]); + device_resources_manager::set_workspace_memory_resource(workspace_mrs[1], devices[1]); + + // Suppress the many warnings from testing use of setters after initial + // get_device_resources call + auto scoped_log_level = log_level_setter{RAFT_LEVEL_ERROR}; + + omp_set_dynamic(0); +#pragma omp parallel for num_threads(5) + for (auto i = std::size_t{}; i < 101; ++i) { + thread_local auto prev_streams = std::array, 2>{}; + auto device = devices[i % devices.size()]; + auto const& res = device_resources_manager::get_device_resources(device); + + auto primary_stream = res.get_stream().value(); + prev_streams[device] = prev_streams[device].value_or(primary_stream); + // Expect to receive the same stream every time for a given thread + EXPECT_EQ(*prev_streams[device], primary_stream); + + // Using RAII device setter here to avoid changing device in other tests + // that depend on a specific device to be set + auto scoped_device = device_setter{device}; + auto const& res2 = device_resources_manager::get_device_resources(); + // Expect device_resources to default to current device + EXPECT_EQ(primary_stream, res2.get_stream().value()); + + auto const& pool = res.get_stream_pool(); + EXPECT_EQ(streams_per_pool, pool.get_pool_size()); + + auto* mr = dynamic_cast*>( + rmm::mr::get_current_device_resource()); + auto* workspace_mr = + dynamic_cast*>( + dynamic_cast*>( + res.get_workspace_resource()) + ->get_upstream()); + if (upstream_mrs[i % devices.size()] != nullptr) { + // Expect that the current memory resource is a pool memory resource as requested + EXPECT_NE(mr, nullptr); + // Expect that the upstream workspace memory resource is a pool memory + // resource as requested + EXPECT_NE(workspace_mr, nullptr); + } + + { + auto lock = std::unique_lock{mtx}; + unique_streams[device].insert(primary_stream); + unique_pools[device].insert(&pool); + } + // Ensure that setters have no effect after get_device_resources call + device_resources_manager::set_streams_per_device(streams_per_device + 1); + device_resources_manager::set_stream_pools_per_device(pools_per_device - 1); + device_resources_manager::set_mem_pool(); + device_resources_manager::set_workspace_allocation_limit(1024); + device_resources_manager::set_workspace_memory_resource( + alternate_workspace_mrs[i % devices.size()], devices[i % devices.size()]); + } + + EXPECT_EQ(streams_per_device, unique_streams[devices[0]].size()); + EXPECT_EQ(streams_per_device, unique_streams[devices[1]].size()); + EXPECT_EQ(pools_per_device, unique_pools[devices[0]].size()); + EXPECT_EQ(pools_per_device, unique_pools[devices[1]].size()); +} + +} // namespace raft diff --git a/cpp/test/core/device_setter.cpp b/cpp/test/core/device_setter.cpp new file mode 100644 index 0000000000..5a4ff01346 --- /dev/null +++ b/cpp/test/core/device_setter.cpp @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include +#include + +namespace raft { +TEST(DeviceSetter, ScopedDevice) +{ + auto device_a = int{}; + auto device_b = int{device_setter::get_device_count() > 1}; + if (device_b == device_a) { + RAFT_LOG_WARN("Only 1 CUDA device detected. device_setter test will be trivial"); + } + auto initial_device = 0; + RAFT_CUDA_TRY(cudaGetDevice(&initial_device)); + auto current_device = initial_device; + { + auto scoped_device = device_setter{device_a}; + // Confirm that device is currently device_a + RAFT_CUDA_TRY(cudaGetDevice(¤t_device)); + EXPECT_EQ(current_device, device_a); + // Confirm that get_current_device reports expected device + EXPECT_EQ(current_device, device_setter::get_current_device()); + } + + // Confirm that device went back to initial value once setter was out of + // scope + RAFT_CUDA_TRY(cudaGetDevice(¤t_device)); + EXPECT_EQ(current_device, initial_device); + + { + auto scoped_device = device_setter{device_b}; + // Confirm that device is currently device_b + RAFT_CUDA_TRY(cudaGetDevice(¤t_device)); + EXPECT_EQ(current_device, device_b); + // Confirm that get_current_device reports expected device + EXPECT_EQ(current_device, device_setter::get_current_device()); + } + + // Confirm that device went back to initial value once setter was out of + // scope + RAFT_CUDA_TRY(cudaGetDevice(¤t_device)); + EXPECT_EQ(current_device, initial_device); + + { + auto scoped_device1 = device_setter{device_b}; + auto scoped_device2 = device_setter{device_a}; + RAFT_CUDA_TRY(cudaGetDevice(¤t_device)); + // Confirm that multiple setters behave as expected, with the last + // constructed taking precedence + EXPECT_EQ(current_device, device_a); + } +} +} // namespace raft diff --git a/docs/source/cpp_api/core_resources.rst b/docs/source/cpp_api/core_resources.rst index e3d402d6af..85c454b355 100644 --- a/docs/source/cpp_api/core_resources.rst +++ b/docs/source/cpp_api/core_resources.rst @@ -35,6 +35,25 @@ namespace *raft::core* :project: RAFT :members: +Device Resources Manager +------------------------ + +While `raft::device_resources` provides a convenient way to access +device-related resources for a sequence of RAFT calls, it is sometimes useful +to be able to limit those resources across an entire application. For +instance, in highly multi-threaded applications, it can be helpful to limit +the total number of streams rather than relying on the default stream per +thread. `raft::device_resources_manager` offers a way to access +`raft::device_resources` instances that draw from a limited pool of +underlying device resources. + +``#include `` + +namespace *raft::core* + +.. doxygenclass:: raft::device_resources_manager + :project: RAFT + :members: Resource Functions ------------------