diff --git a/include/tvm/runtime/threading_backend.h b/include/tvm/runtime/threading_backend.h index 43636ddbdb1f..d44b273153d0 100644 --- a/include/tvm/runtime/threading_backend.h +++ b/include/tvm/runtime/threading_backend.h @@ -28,6 +28,27 @@ #include #include +#if defined(__linux__) || defined(__ANDROID__) +#if defined(__ANDROID__) +#ifndef CPU_SET +#define CPU_SETSIZE 1024 +#define __NCPUBITS (8 * sizeof(uint64_t)) +typedef struct { + uint64_t __bits[CPU_SETSIZE / __NCPUBITS]; +} cpu_set_t; + +#define CPU_SET(cpu, cpusetp) \ + ((cpusetp)->__bits[(cpu) / __NCPUBITS] |= (1UL << ((cpu) % __NCPUBITS))) +#define CPU_ZERO(cpusetp) memset((cpusetp), 0, sizeof(cpu_set_t)) +#define CPU_ISSET(cpu, cpusetp) \ + (1UL << ((cpu) % __NCPUBITS)) == \ + ((cpusetp)->__bits[(cpu) / __NCPUBITS] & (1UL << ((cpu) % __NCPUBITS))) +#define CPU_EQUAL(left, right) (memcmp(&left, &right, sizeof(cpu_set_t)) == 0) + +#endif +#endif +#endif + namespace tvm { namespace runtime { namespace threading { @@ -64,21 +85,26 @@ class ThreadGroup { enum AffinityMode : int { kBig = 1, kLittle = -1, + /*Different threads will get different affinities.*/ + kSpecifyOneCorePerThread = -2, + /*All threads will get the same core group affinity.*/ + kSpecifyThreadShareAllCore = -3, }; - /*! * \brief configure the CPU id affinity * - * \param mode The preferred CPU type (1 = big, -1 = little). + * \param mode The preferred CPU type (1 = big, -1 = little ...). * \param nthreads The number of threads to use (0 = use all). * \param exclude_worker0 Whether to use the main thread as a worker. * If `true`, worker0 will not be launched in a new thread and * `worker_callback` will only be called for values >= 1. This * allows use of the main thread as a worker. + * \param cpus A list of CPU used to set 'cpu affinity'. * * \return The number of workers to use. */ - int Configure(AffinityMode mode, int nthreads, bool exclude_worker0); + int Configure(AffinityMode mode, int nthreads, bool exclude_worker0, + std::vector cpus = {}); private: Impl* impl_; @@ -88,12 +114,14 @@ class ThreadGroup { * \brief Platform-agnostic no-op. */ void Yield(); - /*! * \return the maximum number of effective workers for this system. */ int MaxConcurrency(); - +/*! + * \brief Setting the maximum number of available cores. + */ +void SetMaxConcurrency(int value); /*! * \brief Reset the threads in the pool. All current threads are destroyed and * new ones are created. @@ -102,6 +130,16 @@ int MaxConcurrency(); */ void ResetThreadPool(); +/*! + * \brief Configuring the CPU affinity mode for the working threads. + * \param mode The preferred CPU type (1 = big, -1 = little, -2 = kSpecifyOneCorePerThread, + * -3 = kSpecifyThreadShareAllCore). + * \param nthreads The number of threads to use (0 = use all). + * \param cpus A list of CPUs is used to set the 'cpu affinity' for the worker threads. + */ +void Configure(tvm::runtime::threading::ThreadGroup::AffinityMode mode, int nthreads, + std::vector cpus); + } // namespace threading } // namespace runtime } // namespace tvm diff --git a/src/runtime/thread_pool.cc b/src/runtime/thread_pool.cc index c8d1845266e8..50b231a1db90 100644 --- a/src/runtime/thread_pool.cc +++ b/src/runtime/thread_pool.cc @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -42,12 +43,13 @@ #include #include +#include "../support/utils.h" const constexpr int kL1CacheBytes = 64; namespace tvm { namespace runtime { namespace { - +using support::IsNumber; constexpr uint32_t kDefaultSpinCount = 300000; uint32_t GetSpinCount() { @@ -317,10 +319,11 @@ class ThreadPool { static ThreadPool* ThreadLocal() { return dmlc::ThreadLocalStore::Get(); } - void UpdateWorkerConfiguration(threading::ThreadGroup::AffinityMode mode, int nthreads) { + void UpdateWorkerConfiguration(threading::ThreadGroup::AffinityMode mode, int nthreads, + const std::vector& cpus) { // this will also reset the affinity of the ThreadGroup // may use less than the MaxConcurrency number of workers - num_workers_used_ = threads_->Configure(mode, nthreads, exclude_worker0_); + num_workers_used_ = threads_->Configure(mode, nthreads, exclude_worker0_, cpus); // if MaxConcurrency restricted the number of workers (e.g., due to // hyperthreading), respect the restriction num_workers_used_ = std::min(num_workers_, num_workers_used_); @@ -369,17 +372,42 @@ class ThreadPool { std::unique_ptr threads_; }; +/*! + * \brief args[0] is the AffinityMode, args[1] is the number of threads. + * args2 is a list of CPUs which is used to set the CPU affinity. + */ TVM_REGISTER_GLOBAL("runtime.config_threadpool").set_body([](TVMArgs args, TVMRetValue* rv) { threading::ThreadGroup::AffinityMode mode = static_cast(static_cast(args[0])); int nthreads = args[1]; - ThreadPool::ThreadLocal()->UpdateWorkerConfiguration(mode, nthreads); + std::vector cpus; + if (args.num_args >= 3) { + Array cpu_array = args[2]; + for (auto cpu : cpu_array) { + ICHECK(IsNumber(cpu)) << "The CPU core information '" << cpu << "' is not a number."; + cpus.push_back(std::stoi(cpu)); + std::cout << "cpu is " << cpu << std::endl; + } + } + threading::Configure(mode, nthreads, cpus); }); namespace threading { void ResetThreadPool() { tvm::runtime::ThreadPool::ThreadLocal()->Reset(); } +/*! + * \brief configure the CPU id affinity + * \param mode The preferred CPU type (1 = big, -1 = little, -2 = specify , + * -3 = kSpecifyOneCorePerThread, -3 = kSpecifyThreadShareAllCore). + * \param nthreads The number of threads to use (0 = use all). + * \param cpus cpus A list of CPUs is used to set the 'cpu affinity' for the worker threads. + * + */ +void Configure(tvm::runtime::threading::ThreadGroup::AffinityMode mode, int nthreads, + std::vector cpus) { + tvm::runtime::threading::SetMaxConcurrency(cpus.size()); + tvm::runtime::ThreadPool::ThreadLocal()->UpdateWorkerConfiguration(mode, nthreads, cpus); +} } // namespace threading - } // namespace runtime } // namespace tvm diff --git a/src/runtime/threading_backend.cc b/src/runtime/threading_backend.cc index 5b3093ac85cd..951e5eecd216 100644 --- a/src/runtime/threading_backend.cc +++ b/src/runtime/threading_backend.cc @@ -24,8 +24,6 @@ #include #include -#include -#include #if defined(__linux__) || defined(__ANDROID__) #include #include @@ -37,11 +35,13 @@ #if defined(__hexagon__) #include #endif - +#include +#include +#define CURRENT_THREAD_HANDLE (static_cast(0)) namespace tvm { namespace runtime { namespace threading { - +thread_local int max_concurrency = 0; class ThreadGroup::Impl { public: Impl(int num_workers, std::function worker_callback, bool exclude_worker0) @@ -60,15 +60,24 @@ class ThreadGroup::Impl { } } - int Configure(AffinityMode mode, int nthreads, bool exclude_worker0) { + int Configure(AffinityMode mode, int nthreads, bool exclude_worker0, + std::vector cpus) { int num_workers_used = 0; - if (mode == kLittle) { - num_workers_used = little_count_; - } else if (mode == kBig) { - num_workers_used = big_count_; - } else { - // use default - num_workers_used = threading::MaxConcurrency(); + switch (mode) { + case kLittle: + num_workers_used = little_count_; + break; + case kBig: + num_workers_used = big_count_; + break; + case kSpecifyOneCorePerThread: + case kSpecifyThreadShareAllCore: + num_workers_used = cpus.size(); + sorted_order_ = cpus; + break; + default: + // use default + num_workers_used = threading::MaxConcurrency(); } // if a specific number was given, use that if (nthreads) { @@ -79,71 +88,92 @@ class ThreadGroup::Impl { // and N/2 physical cores this will set affinity to the first N/2 logical // ones. num_workers_used = std::min(num_workers_, num_workers_used); - - const char* val = getenv("TVM_BIND_THREADS"); - if (val == nullptr || atoi(val) == 1) { - // Do not set affinity if there are more workers than found cores - if (sorted_order_.size() >= static_cast(num_workers_)) { - SetAffinity(exclude_worker0, mode == kLittle); - } else { - LOG(WARNING) << "The thread affinity cannot be set when the number of workers" - << "is larger than the number of available cores in the system."; - } - } + SetAffinity(exclude_worker0, mode); return num_workers_used; } private: - // bind worker threads to disjoint cores - // if worker 0 is offloaded to main, i.e. exclude_worker0 is true, - // the main thread is bound to core 0. - void SetAffinity(bool exclude_worker0, bool reverse = false) { -#if defined(__ANDROID__) -#ifndef CPU_SET -#define CPU_SETSIZE 1024 -#define __NCPUBITS (8 * sizeof(uint64_t)) - typedef struct { - uint64_t __bits[CPU_SETSIZE / __NCPUBITS]; - } cpu_set_t; - -#define CPU_SET(cpu, cpusetp) \ - ((cpusetp)->__bits[(cpu) / __NCPUBITS] |= (1UL << ((cpu) % __NCPUBITS))) -#define CPU_ZERO(cpusetp) memset((cpusetp), 0, sizeof(cpu_set_t)) -#endif -#endif + void SetThreadAffinity(std::thread::native_handle_type thread, + const std::vector& ids) { #if defined(__linux__) || defined(__ANDROID__) - ICHECK_GE(sorted_order_.size(), num_workers_); - - for (unsigned i = 0; i < threads_.size(); ++i) { - unsigned core_id; - if (reverse) { - core_id = sorted_order_[sorted_order_.size() - (i + exclude_worker0) - 1]; - } else { - core_id = sorted_order_[i + exclude_worker0]; - } - cpu_set_t cpuset; - CPU_ZERO(&cpuset); - CPU_SET(core_id, &cpuset); + if (pthread_equal(thread, CURRENT_THREAD_HANDLE)) { + thread = pthread_self(); + } + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + for (auto id : ids) { + CPU_SET(id, &cpuset); + } #if defined(__ANDROID__) - sched_setaffinity(threads_[i].native_handle(), sizeof(cpu_set_t), &cpuset); + sched_setaffinity(thread, sizeof(cpu_set_t), &cpuset); #else - pthread_setaffinity_np(threads_[i].native_handle(), sizeof(cpu_set_t), &cpuset); + pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset); #endif +#endif + } + + // bind worker threads to disjoint cores + // if worker 0 is offloaded to main, i.e. exclude_worker0 is true, + // the main thread is bound to core 0. + void SetAffinity(bool exclude_worker0, AffinityMode mode) { + const char* val = getenv("TVM_BIND_THREADS"); + if (val != nullptr && atoi(val) != 1) { + return; } - if (exclude_worker0) { // main thread run task - // Master thread will have free migration on needed cores. - // Typically, the OS will schedule the main thread to run at core 0, - // which is idle, when other workers are running. - // See the comment inside SetMasterThreadFullCpuAffinity function to get more detail. - SetMasterThreadFullCpuAffinity(reverse); + // Do not set affinity if there are more workers than found cores and mode is not kSpecify*. + if (sorted_order_.size() < static_cast(num_workers_)) { + switch (mode) { + // When the mode is kSpecifyOneCorePerThread or kSpecifyThreadShareAllCore, we should + // let the threads share all the cpu cores. + case kSpecifyOneCorePerThread: + case kSpecifyThreadShareAllCore: + for (unsigned i = 0; i < threads_.size(); ++i) { + SetThreadFullCpuAffinity(threads_[i].native_handle(), mode); + } + if (exclude_worker0) { // main thread run task + SetMasterThreadFullCpuAffinity(mode); + } + break; + case kLittle: + case kBig: + LOG(WARNING) << "The thread affinity cannot be set when the number of workers" + << "is larger than the number of available cores in the system."; + break; + } + } else { + ICHECK_GE(sorted_order_.size(), num_workers_); + switch (mode) { + case kSpecifyThreadShareAllCore: + for (unsigned i = 0; i < threads_.size(); ++i) { + SetThreadFullCpuAffinity(threads_[i].native_handle(), mode); + } + break; + case kLittle: + case kBig: + case kSpecifyOneCorePerThread: + for (unsigned i = 0; i < threads_.size(); ++i) { + bool reverse = mode == kLittle; + unsigned core_id; + if (reverse) { + core_id = sorted_order_[sorted_order_.size() - (i + exclude_worker0) - 1]; + } else { + core_id = sorted_order_[i + exclude_worker0]; + } + SetThreadAffinity(threads_[i].native_handle(), {core_id}); + } + break; + } + if (exclude_worker0) { // main thread run task + // Master thread will have free migration on needed cores. + // Typically, the OS will schedule the main thread to run at core 0, + // which is idle, when other workers are running. + // See the comment inside SetMasterThreadFullCpuAffinity function to get more detail. + SetMasterThreadFullCpuAffinity(mode); + } } -#endif } - void SetMasterThreadFullCpuAffinity(bool reverse) { -#if defined(__linux__) || defined(__ANDROID__) - cpu_set_t cpuset; - CPU_ZERO(&cpuset); + void SetThreadFullCpuAffinity(std::thread::native_handle_type thread, AffinityMode mode) { // For example, we have 2xA72 + 4xA53 (id is 0 - 5, 4, 5 is A72 big core) // And we use config_threadpool API to set we will only use 4xA53. // The sorted_order will be [4, 5, 0, 1, 2, 3]. @@ -154,22 +184,31 @@ class ThreadGroup::Impl { // Note: this works well on x86 too. Because x86 doesn't have BIG.LITTLE, // our implementation will use kBig mode by default and will let main thread // run on intended cores. - if (reverse) { - for (int i = 0; i < little_count_; ++i) { - CPU_SET(sorted_order_[sorted_order_.size() - i - 1], &cpuset); - } - } else { - int num_cpu_workers = std::min(MaxConcurrency(), big_count_); - for (int i = 0; i < num_cpu_workers; ++i) { - CPU_SET(sorted_order_[i], &cpuset); - } + std::vector ids; + switch (mode) { + case kSpecifyOneCorePerThread: + case kSpecifyThreadShareAllCore: + for (size_t i = 0; i < sorted_order_.size(); ++i) { + ids.push_back(sorted_order_[i]); + } + break; + case kLittle: + for (int i = 0; i < little_count_; ++i) { + ids.push_back(sorted_order_[sorted_order_.size() - i - 1]); + } + break; + case kBig: + int num_cpu_workers = std::min(MaxConcurrency(), big_count_); + for (int i = 0; i < num_cpu_workers; ++i) { + ids.push_back(sorted_order_[i]); + } + break; } -#if defined(__ANDROID__) - sched_setaffinity(pthread_self(), sizeof(cpu_set_t), &cpuset); -#else - pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); -#endif -#endif + SetThreadAffinity(thread, ids); + } + + void SetMasterThreadFullCpuAffinity(AffinityMode mode) { + SetThreadFullCpuAffinity(CURRENT_THREAD_HANDLE, mode); } void InitSortedOrder() { @@ -231,36 +270,48 @@ ThreadGroup::ThreadGroup(int num_workers, std::function worker_callba ThreadGroup::~ThreadGroup() { delete impl_; } void ThreadGroup::Join() { impl_->Join(); } -int ThreadGroup::Configure(AffinityMode mode, int nthreads, bool exclude_worker0) { - return impl_->Configure(mode, nthreads, exclude_worker0); +int ThreadGroup::Configure(AffinityMode mode, int nthreads, bool exclude_worker0, + std::vector cpus) { + return impl_->Configure(mode, nthreads, exclude_worker0, cpus); } void Yield() { std::this_thread::yield(); } - +/*! + * \bief Set the maximum number of available cores. + */ +void SetMaxConcurrency(int value) { + if (value > 0) { + max_concurrency = value; + } +} int MaxConcurrency() { int max_concurrency = 1; - const char* val = getenv("TVM_NUM_THREADS"); - if (val == nullptr) { - val = getenv("OMP_NUM_THREADS"); - } - if (val != nullptr) { - max_concurrency = atoi(val); + if (tvm::runtime::threading::max_concurrency != 0) { + max_concurrency = tvm::runtime::threading::max_concurrency; } else { - max_concurrency = std::thread::hardware_concurrency(); + const char* val = getenv("TVM_NUM_THREADS"); + if (val == nullptr) { + val = getenv("OMP_NUM_THREADS"); + } + if (val != nullptr) { + max_concurrency = atoi(val); + } else { + max_concurrency = std::thread::hardware_concurrency(); #if defined(_M_X64) || defined(__x86_64__) - max_concurrency /= 2; // ignore hyper-threading + max_concurrency /= 2; // ignore hyper-threading #elif defined(__hexagon__) - // With unsigned PDs, getting the number of available hardware threads - // is not supported in earlier versions of QuRT. In such cases assume 4. - // If running on simulator, set max_concurrency to 1. - if (max_concurrency == 0) { - if (dlsym(RTLD_DEFAULT, "running_in_sim_dev_17bc90206f6cf5a7")) { - max_concurrency = 1; - } else { - max_concurrency = 4; + // With unsigned PDs, getting the number of available hardware threads + // is not supported in earlier versions of QuRT. In such cases assume 4. + // If running on simulator, set max_concurrency to 1. + if (max_concurrency == 0) { + if (dlsym(RTLD_DEFAULT, "running_in_sim_dev_17bc90206f6cf5a7")) { + max_concurrency = 1; + } else { + max_concurrency = 4; + } } - } #endif + } } return std::max(max_concurrency, 1); } diff --git a/tests/cpp/threading_backend_test.cc b/tests/cpp/threading_backend_test.cc index 948838971796..db32623531b8 100644 --- a/tests/cpp/threading_backend_test.cc +++ b/tests/cpp/threading_backend_test.cc @@ -17,22 +17,104 @@ * under the License. */ +#include #include #include +#include #include #include +#include #include +#include +#include constexpr size_t N = 128; +void AtomicCompute(int task_id, size_t n, std::atomic* acc, TVMParallelGroupEnv* penv) { + const size_t N_per_task = (n + penv->num_task - 1) / penv->num_task; + for (size_t i = task_id * N_per_task; i < n && i < (task_id + 1) * N_per_task; ++i) { + acc->fetch_add(i, std::memory_order_relaxed); + } + return; +} + +class AffinityCheck { + public: + AffinityCheck(uint32_t parent_id, int max_concurrency, std::atomic* acc) + : id_(parent_id), max_concurrency_(max_concurrency), acc_(acc) {} + + void Compute(int task_id, size_t n, TVMParallelGroupEnv* penv) { + AtomicCompute(task_id, n, acc_, penv); + } + + int GetComputeResult() { return acc_->load(std::memory_order_relaxed); } + + void GetAffinity(int task_id) { +#if defined(__linux__) + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + pthread_getaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); + std::lock_guard lock(mutex_); + thread_affinity_[task_id] = cpuset; + // Printing the current thread CPU affinity. + std::ostringstream str; + for (int i = 0; i < max_concurrency_; i++) { + if (CPU_ISSET(i, &cpuset)) { + str << i << ","; + } + } + LOG(INFO) << "id:" << id_ << " taskid:" << task_id << " affinity:" << str.str() << std::endl; +#endif + } + + bool VerifyAffinity(const std::vector& cpus) { +#if defined(__linux__) + std::unordered_set uset; + cpu_set_t cpu_mask; + CPU_ZERO(&cpu_mask); + for (auto x : cpus) { + CPU_SET(x, &cpu_mask); + uset.insert(x); + } + + for (auto x : thread_affinity_) { + if (!CPU_EQUAL(&cpu_mask, &x.second)) { + bool cpu_find = false; + for (auto cpu : uset) { + CPU_ISSET(cpu, &x.second); + uset.erase(cpu); + cpu_find = true; + break; + } + if (!cpu_find) return false; + } + } +#endif + return true; + } + + private: + uint32_t id_; + int max_concurrency_; + std::atomic* acc_; + std::mutex mutex_; +#if defined(__linux__) + std::unordered_map thread_affinity_; +#endif +}; static FTVMParallelLambda atomic_add_task_id = [](int task_id, TVMParallelGroupEnv* penv, void* cdata) -> int { auto* data = reinterpret_cast*>(cdata); - const size_t N_per_task = (N + penv->num_task - 1) / penv->num_task; - for (size_t i = task_id * N_per_task; i < N && i < (task_id + 1) * N_per_task; ++i) { - data->fetch_add(i, std::memory_order_relaxed); - } + AtomicCompute(task_id, N, data, penv); + return 0; +}; + +static FTVMParallelLambda affinity_check_task_id = [](int task_id, TVMParallelGroupEnv* penv, + void* cdata) -> int { + auto* data = reinterpret_cast(cdata); + data->Compute(task_id, N, penv); + data->GetAffinity(task_id); return 0; }; @@ -63,3 +145,43 @@ TEST(ThreadingBackend, TVMBackendParallelLaunchMultipleThreads) { } } } + +TEST(ThreadingBackend, TVMBackendAffinityConfigure) { + int max_concurrency = tvm::runtime::threading::MaxConcurrency(); + std::vector> ts; + // Returning as there is only one CPU available. + if (max_concurrency <= 1) { + return; + } + // Creating two threads to test the 'CPU list affinity' feature. + const int threads_num = 2; + // Getting the maximum number of CPUs which are available to each thread. + const int cpus_num_per_thread = max_concurrency / threads_num; + // Testing two mode of affinity., + std::vector modes = { + tvm::runtime::threading::ThreadGroup::kSpecifyOneCorePerThread, + tvm::runtime::threading::ThreadGroup::kSpecifyThreadShareAllCore}; + for (auto mode : modes) { + for (int thread_pool_idx = 0; thread_pool_idx < threads_num; thread_pool_idx++) { + ts.emplace_back(new std::thread( + [&](int thread_pool_index, int sys_max_concurrency, + tvm::runtime::threading::ThreadGroup::AffinityMode affinity_mode) { + std::atomic acc(0); + AffinityCheck ac(thread_pool_index, sys_max_concurrency, &acc); + std::vector cpus; + std::cout << affinity_mode << std::endl; + for (int k = 0; k < cpus_num_per_thread; k++) { + cpus.push_back(thread_pool_index * cpus_num_per_thread + k); + } + tvm::runtime::threading ::Configure(affinity_mode, 0, cpus); + TVMBackendParallelLaunch(affinity_check_task_id, &ac, 0); + EXPECT_EQ(ac.GetComputeResult(), N * (N - 1) / 2); + EXPECT_EQ(ac.VerifyAffinity(cpus), true); + }, + thread_pool_idx, max_concurrency, mode)); + } + } + for (auto& t : ts) { + t->join(); + } +}