diff --git a/include/tvm/runtime/threading_backend.h b/include/tvm/runtime/threading_backend.h index 43636ddbdb1f9..976a0172be640 100644 --- a/include/tvm/runtime/threading_backend.h +++ b/include/tvm/runtime/threading_backend.h @@ -28,6 +28,27 @@ #include #include +#if defined(__linux__) || defined(__ANDROID__) +#if defined(__ANDROID__) +#ifndef CPU_SET +#define CPU_SETSIZE 1024 +#define __NCPUBITS (8 * sizeof(uint64_t)) +typedef struct { + uint64_t __bits[CPU_SETSIZE / __NCPUBITS]; +} cpu_set_t; + +#define CPU_SET(cpu, cpusetp) \ + ((cpusetp)->__bits[(cpu) / __NCPUBITS] |= (1UL << ((cpu) % __NCPUBITS))) +#define CPU_ZERO(cpusetp) memset((cpusetp), 0, sizeof(cpu_set_t)) +#define CPU_ISSET(cpu, cpusetp) \ + (1UL << ((cpu) % __NCPUBITS)) == \ + ((cpusetp)->__bits[(cpu) / __NCPUBITS] & (1UL << ((cpu) % __NCPUBITS))) +#define CPU_EQUAL(left, right) (memcmp(&left, &right, sizeof(cpu_set_t)) == 0) + +#endif +#endif +#endif + namespace tvm { namespace runtime { namespace threading { @@ -64,6 +85,7 @@ class ThreadGroup { enum AffinityMode : int { kBig = 1, kLittle = -1, + kSpecify = -2, }; /*! @@ -71,6 +93,7 @@ class ThreadGroup { * * \param mode The preferred CPU type (1 = big, -1 = little). * \param nthreads The number of threads to use (0 = use all). + * \param cpus A list of cpu to use for affinity setting. * \param exclude_worker0 Whether to use the main thread as a worker. * If `true`, worker0 will not be launched in a new thread and * `worker_callback` will only be called for values >= 1. This @@ -78,7 +101,8 @@ class ThreadGroup { * * \return The number of workers to use. */ - int Configure(AffinityMode mode, int nthreads, bool exclude_worker0); + int Configure(AffinityMode mode, int nthreads, std::vector cpus, + bool exclude_worker0); private: Impl* impl_; @@ -94,6 +118,8 @@ void Yield(); */ int MaxConcurrency(); +void SetMaxConcurrency(int value); + /*! * \brief Reset the threads in the pool. All current threads are destroyed and * new ones are created. @@ -102,6 +128,9 @@ int MaxConcurrency(); */ void ResetThreadPool(); +void Configure(tvm::runtime::threading::ThreadGroup::AffinityMode mode, int nthreads, + std::vector cpus, int max_concurrency = 0); + } // namespace threading } // namespace runtime } // namespace tvm diff --git a/src/runtime/thread_pool.cc b/src/runtime/thread_pool.cc index c8d1845266e88..10d8865dadb0c 100644 --- a/src/runtime/thread_pool.cc +++ b/src/runtime/thread_pool.cc @@ -28,6 +28,7 @@ #include #include #include +#include #if TVM_THREADPOOL_USE_OPENMP #include #endif @@ -317,10 +318,11 @@ class ThreadPool { static ThreadPool* ThreadLocal() { return dmlc::ThreadLocalStore::Get(); } - void UpdateWorkerConfiguration(threading::ThreadGroup::AffinityMode mode, int nthreads) { + void UpdateWorkerConfiguration(threading::ThreadGroup::AffinityMode mode, int nthreads, + const std::vector& cpus) { // this will also reset the affinity of the ThreadGroup // may use less than the MaxConcurrency number of workers - num_workers_used_ = threads_->Configure(mode, nthreads, exclude_worker0_); + num_workers_used_ = threads_->Configure(mode, nthreads, cpus, exclude_worker0_); // if MaxConcurrency restricted the number of workers (e.g., due to // hyperthreading), respect the restriction num_workers_used_ = std::min(num_workers_, num_workers_used_); @@ -337,7 +339,7 @@ class ThreadPool { new tvm::runtime::threading::ThreadGroup( num_workers_, [this](int worker_id) { this->RunWorker(worker_id); }, exclude_worker0_ /* include_main_thread */)); - num_workers_used_ = threads_->Configure(threading::ThreadGroup::kBig, 0, exclude_worker0_); + num_workers_used_ = threads_->Configure(threading::ThreadGroup::kBig, 0, {}, exclude_worker0_); } // Internal worker function. @@ -373,13 +375,28 @@ TVM_REGISTER_GLOBAL("runtime.config_threadpool").set_body([](TVMArgs args, TVMRe threading::ThreadGroup::AffinityMode mode = static_cast(static_cast(args[0])); int nthreads = args[1]; - ThreadPool::ThreadLocal()->UpdateWorkerConfiguration(mode, nthreads); + std::vector cpus; + int max_concurrency = 0; + if (args.num_args == 3) { + Array cpu_array = args[2]; + for (auto cpu : cpu_array) { + cpus.push_back(cpu); + } + } + if (args.num_args == 4) { + max_concurrency = args[3]; + } + threading::Configure(mode, nthreads, cpus, max_concurrency); }); namespace threading { void ResetThreadPool() { tvm::runtime::ThreadPool::ThreadLocal()->Reset(); } +void Configure(tvm::runtime::threading::ThreadGroup::AffinityMode mode, int nthreads, + std::vector cpus, int max_concurrency) { + tvm::runtime::threading::SetMaxConcurrency(max_concurrency); + tvm::runtime::ThreadPool::ThreadLocal()->UpdateWorkerConfiguration(mode, nthreads, cpus); +} } // namespace threading - } // namespace runtime } // namespace tvm diff --git a/src/runtime/threading_backend.cc b/src/runtime/threading_backend.cc index 5b3093ac85cda..3811399cba474 100644 --- a/src/runtime/threading_backend.cc +++ b/src/runtime/threading_backend.cc @@ -24,8 +24,6 @@ #include #include -#include -#include #if defined(__linux__) || defined(__ANDROID__) #include #include @@ -37,11 +35,14 @@ #if defined(__hexagon__) #include #endif +#include +#include +#define CURRENT_THREAD_HANDLE ((pthread_t)-1) namespace tvm { namespace runtime { namespace threading { - +thread_local int max_concurrency_value = 0; class ThreadGroup::Impl { public: Impl(int num_workers, std::function worker_callback, bool exclude_worker0) @@ -60,12 +61,17 @@ class ThreadGroup::Impl { } } - int Configure(AffinityMode mode, int nthreads, bool exclude_worker0) { + int Configure(AffinityMode mode, int nthreads, std::vector cpus, + bool exclude_worker0) { int num_workers_used = 0; if (mode == kLittle) { num_workers_used = little_count_; } else if (mode == kBig) { num_workers_used = big_count_; + } else if (mode == kSpecify) { + num_workers_used = cpus.size(); + // Set the cpu list. + UpdateSortedOrder(cpus); } else { // use default num_workers_used = threading::MaxConcurrency(); @@ -79,71 +85,76 @@ class ThreadGroup::Impl { // and N/2 physical cores this will set affinity to the first N/2 logical // ones. num_workers_used = std::min(num_workers_, num_workers_used); - - const char* val = getenv("TVM_BIND_THREADS"); - if (val == nullptr || atoi(val) == 1) { - // Do not set affinity if there are more workers than found cores - if (sorted_order_.size() >= static_cast(num_workers_)) { - SetAffinity(exclude_worker0, mode == kLittle); - } else { - LOG(WARNING) << "The thread affinity cannot be set when the number of workers" - << "is larger than the number of available cores in the system."; - } - } + SetAffinity(exclude_worker0, mode); return num_workers_used; } private: - // bind worker threads to disjoint cores - // if worker 0 is offloaded to main, i.e. exclude_worker0 is true, - // the main thread is bound to core 0. - void SetAffinity(bool exclude_worker0, bool reverse = false) { + void SetThreadAffinity(std::thread::native_handle_type thread, + const std::vector& ids) { +#if defined(__linux__) || defined(__ANDROID__) + if (pthread_equal(thread, CURRENT_THREAD_HANDLE)) { + thread = pthread_self(); + } + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + for (auto id : ids) { + CPU_SET(id, &cpuset); + } #if defined(__ANDROID__) -#ifndef CPU_SET -#define CPU_SETSIZE 1024 -#define __NCPUBITS (8 * sizeof(uint64_t)) - typedef struct { - uint64_t __bits[CPU_SETSIZE / __NCPUBITS]; - } cpu_set_t; - -#define CPU_SET(cpu, cpusetp) \ - ((cpusetp)->__bits[(cpu) / __NCPUBITS] |= (1UL << ((cpu) % __NCPUBITS))) -#define CPU_ZERO(cpusetp) memset((cpusetp), 0, sizeof(cpu_set_t)) + sched_setaffinity(thread, sizeof(cpu_set_t), &cpuset); +#else + pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset); #endif #endif -#if defined(__linux__) || defined(__ANDROID__) - ICHECK_GE(sorted_order_.size(), num_workers_); + } - for (unsigned i = 0; i < threads_.size(); ++i) { - unsigned core_id; - if (reverse) { - core_id = sorted_order_[sorted_order_.size() - (i + exclude_worker0) - 1]; + // bind worker threads to disjoint cores + // if worker 0 is offloaded to main, i.e. exclude_worker0 is true, + // the main thread is bound to core 0. + void SetAffinity(bool exclude_worker0, AffinityMode mode) { + const char* val = getenv("TVM_BIND_THREADS"); + if (val != nullptr && atoi(val) != 1) { + return; + } + // Do not set affinity if there are more workers than found cores and mode is not kSpecify. + if (sorted_order_.size() < static_cast(num_workers_)) { + if (mode == kSpecify) { + // if give a list of cpus and set mode as kSpecify need to restrict the threads + // on the said cpu list + for (unsigned i = 0; i < threads_.size(); ++i) { + SetThreadFullCpuAffinity(threads_[i].native_handle(), mode); + } + if (exclude_worker0) { // main thread run task + SetMasterThreadFullCpuAffinity(mode); + } } else { - core_id = sorted_order_[i + exclude_worker0]; + LOG(WARNING) << "The thread affinity cannot be set when the number of workers" + << "is larger than the number of available cores in the system."; + } + } else { + ICHECK_GE(sorted_order_.size(), num_workers_); + for (unsigned i = 0; i < threads_.size(); ++i) { + bool reverse = mode == kLittle; + unsigned core_id; + if (reverse) { + core_id = sorted_order_[sorted_order_.size() - (i + exclude_worker0) - 1]; + } else { + core_id = sorted_order_[i + exclude_worker0]; + } + SetThreadAffinity(threads_[i].native_handle(), {core_id}); + } + if (exclude_worker0) { // main thread run task + // Master thread will have free migration on needed cores. + // Typically, the OS will schedule the main thread to run at core 0, + // which is idle, when other workers are running. + // See the comment inside SetMasterThreadFullCpuAffinity function to get more detail. + SetMasterThreadFullCpuAffinity(mode); } - cpu_set_t cpuset; - CPU_ZERO(&cpuset); - CPU_SET(core_id, &cpuset); -#if defined(__ANDROID__) - sched_setaffinity(threads_[i].native_handle(), sizeof(cpu_set_t), &cpuset); -#else - pthread_setaffinity_np(threads_[i].native_handle(), sizeof(cpu_set_t), &cpuset); -#endif - } - if (exclude_worker0) { // main thread run task - // Master thread will have free migration on needed cores. - // Typically, the OS will schedule the main thread to run at core 0, - // which is idle, when other workers are running. - // See the comment inside SetMasterThreadFullCpuAffinity function to get more detail. - SetMasterThreadFullCpuAffinity(reverse); } -#endif } - void SetMasterThreadFullCpuAffinity(bool reverse) { -#if defined(__linux__) || defined(__ANDROID__) - cpu_set_t cpuset; - CPU_ZERO(&cpuset); + void SetThreadFullCpuAffinity(std::thread::native_handle_type thread, AffinityMode mode) { // For example, we have 2xA72 + 4xA53 (id is 0 - 5, 4, 5 is A72 big core) // And we use config_threadpool API to set we will only use 4xA53. // The sorted_order will be [4, 5, 0, 1, 2, 3]. @@ -154,22 +165,31 @@ class ThreadGroup::Impl { // Note: this works well on x86 too. Because x86 doesn't have BIG.LITTLE, // our implementation will use kBig mode by default and will let main thread // run on intended cores. - if (reverse) { + std::vector ids; + if (mode == kSpecify) { + for (size_t i = 0; i < sorted_order_.size(); ++i) { + ids.push_back(sorted_order_[i]); + } + } else if (mode == kLittle) { for (int i = 0; i < little_count_; ++i) { - CPU_SET(sorted_order_[sorted_order_.size() - i - 1], &cpuset); + ids.push_back(sorted_order_[sorted_order_.size() - i - 1]); } } else { int num_cpu_workers = std::min(MaxConcurrency(), big_count_); for (int i = 0; i < num_cpu_workers; ++i) { - CPU_SET(sorted_order_[i], &cpuset); + ids.push_back(sorted_order_[i]); } } -#if defined(__ANDROID__) - sched_setaffinity(pthread_self(), sizeof(cpu_set_t), &cpuset); -#else - pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); -#endif -#endif + SetThreadAffinity(thread, ids); + } + + void SetMasterThreadFullCpuAffinity(AffinityMode mode) { + SetThreadFullCpuAffinity(CURRENT_THREAD_HANDLE, mode); + } + + void UpdateSortedOrder(std::vector cpus) { + sorted_order_ = cpus; + return; } void InitSortedOrder() { @@ -231,36 +251,45 @@ ThreadGroup::ThreadGroup(int num_workers, std::function worker_callba ThreadGroup::~ThreadGroup() { delete impl_; } void ThreadGroup::Join() { impl_->Join(); } -int ThreadGroup::Configure(AffinityMode mode, int nthreads, bool exclude_worker0) { - return impl_->Configure(mode, nthreads, exclude_worker0); +int ThreadGroup::Configure(AffinityMode mode, int nthreads, std::vector cpus, + bool exclude_worker0) { + return impl_->Configure(mode, nthreads, cpus, exclude_worker0); } void Yield() { std::this_thread::yield(); } - +void SetMaxConcurrency(int value) { + if (value > 0) { + max_concurrency_value = value; + } +} int MaxConcurrency() { int max_concurrency = 1; - const char* val = getenv("TVM_NUM_THREADS"); - if (val == nullptr) { - val = getenv("OMP_NUM_THREADS"); - } - if (val != nullptr) { - max_concurrency = atoi(val); + if (tvm::runtime::threading::max_concurrency_value != 0) { + max_concurrency = tvm::runtime::threading::max_concurrency_value; } else { - max_concurrency = std::thread::hardware_concurrency(); + const char* val = getenv("TVM_NUM_THREADS"); + if (val == nullptr) { + val = getenv("OMP_NUM_THREADS"); + } + if (val != nullptr) { + max_concurrency = atoi(val); + } else { + max_concurrency = std::thread::hardware_concurrency(); #if defined(_M_X64) || defined(__x86_64__) - max_concurrency /= 2; // ignore hyper-threading + max_concurrency /= 2; // ignore hyper-threading #elif defined(__hexagon__) - // With unsigned PDs, getting the number of available hardware threads - // is not supported in earlier versions of QuRT. In such cases assume 4. - // If running on simulator, set max_concurrency to 1. - if (max_concurrency == 0) { - if (dlsym(RTLD_DEFAULT, "running_in_sim_dev_17bc90206f6cf5a7")) { - max_concurrency = 1; - } else { - max_concurrency = 4; + // With unsigned PDs, getting the number of available hardware threads + // is not supported in earlier versions of QuRT. In such cases assume 4. + // If running on simulator, set max_concurrency to 1. + if (max_concurrency == 0) { + if (dlsym(RTLD_DEFAULT, "running_in_sim_dev_17bc90206f6cf5a7")) { + max_concurrency = 1; + } else { + max_concurrency = 4; + } } - } #endif + } } return std::max(max_concurrency, 1); } diff --git a/tests/cpp/threading_backend_test.cc b/tests/cpp/threading_backend_test.cc index 9488389717967..d697c7dc30756 100644 --- a/tests/cpp/threading_backend_test.cc +++ b/tests/cpp/threading_backend_test.cc @@ -17,22 +17,102 @@ * under the License. */ +#include #include #include +#include #include #include +#include #include +#include +#include constexpr size_t N = 128; +void AtomicCompute(int task_id, size_t n, std::atomic* acc, TVMParallelGroupEnv* penv) { + const size_t N_per_task = (n + penv->num_task - 1) / penv->num_task; + for (size_t i = task_id * N_per_task; i < n && i < (task_id + 1) * N_per_task; ++i) { + acc->fetch_add(i, std::memory_order_relaxed); + } + return; +} + +class AffinityCheck { + public: + AffinityCheck(uint32_t parent_id, int max_concurrency, std::atomic* acc) + : id_(parent_id), max_concurrency_(max_concurrency), acc_(acc) {} + + void Compute(int task_id, size_t n, TVMParallelGroupEnv* penv) { + AtomicCompute(task_id, n, acc_, penv); + } + + int GetComputeResult() { return acc_->load(std::memory_order_relaxed); } + + void GetAffinity(int task_id) { +#if defined(__linux__) + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + pthread_getaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); + std::lock_guard lock(mutex_); + thread_affinity_[task_id] = cpuset; + // Print the affinity information of current thread into log. + std::ostringstream str; + for (int i = 0; i < max_concurrency_; i++) { + if (CPU_ISSET(i, &cpuset)) { + str << i << ","; + } + } + LOG(INFO) << "id:" << id_ << " taskid:" << task_id << " affinity:" << str.str() << std::endl; +#endif + } + + bool VerifyAffinity(const std::vector& cpus) { +#if defined(__linux__) + std::unordered_set uset; + cpu_set_t cpu_mask; + CPU_ZERO(&cpu_mask); + for (auto x : cpus) { + CPU_SET(x, &cpu_mask); + uset.insert(x); + } + + for (auto x : thread_affinity_) { + if (!CPU_EQUAL(&cpu_mask, &x.second)) { + bool cpu_find = false; + for (auto cpu : uset) { + CPU_ISSET(cpu, &x.second); + uset.erase(cpu); + cpu_find = true; + break; + } + if (!cpu_find) return false; + } + } +#endif + return true; + } + + private: + uint32_t id_; + int max_concurrency_; + std::atomic* acc_; + std::mutex mutex_; + std::unordered_map thread_affinity_; +}; static FTVMParallelLambda atomic_add_task_id = [](int task_id, TVMParallelGroupEnv* penv, void* cdata) -> int { auto* data = reinterpret_cast*>(cdata); - const size_t N_per_task = (N + penv->num_task - 1) / penv->num_task; - for (size_t i = task_id * N_per_task; i < N && i < (task_id + 1) * N_per_task; ++i) { - data->fetch_add(i, std::memory_order_relaxed); - } + AtomicCompute(task_id, N, data, penv); + return 0; +}; + +static FTVMParallelLambda affinity_check_task_id = [](int task_id, TVMParallelGroupEnv* penv, + void* cdata) -> int { + auto* data = reinterpret_cast(cdata); + data->Compute(task_id, N, penv); + data->GetAffinity(task_id); return 0; }; @@ -63,3 +143,43 @@ TEST(ThreadingBackend, TVMBackendParallelLaunchMultipleThreads) { } } } + +TEST(ThreadingBackend, TVMBackendAffinityConfigure) { + int max_concurrency = tvm::runtime::threading::MaxConcurrency(); + std::vector> ts; + if (max_concurrency <= 1) { + return; + } + const int thread_pool_num = max_concurrency > 1 ? 2 : 1; + const int cpus_num_per_pool = max_concurrency / thread_pool_num; + std::vector concurrency = {0, 3}; + for (auto value : concurrency) { + for (int i = 0; i < thread_pool_num; i++) { + ts.emplace_back(new std::thread( + [&](int j, int sys_max_concurrency, int concurrency_config) { + std::atomic acc(0); + AffinityCheck ac(j, sys_max_concurrency, &acc); + std::vector cpus; + for (int k = 0; k < cpus_num_per_pool; k++) { + cpus.push_back(j * cpus_num_per_pool + k); + } + if (concurrency_config != 0) { + // Setting max concurrency number as 3 as well as setting the affinity cpu list. + tvm::runtime::threading ::Configure(tvm::runtime::threading::ThreadGroup::kSpecify, 0, + cpus, concurrency_config); + } else { + // Setting max concurrency as no limitation as well as setting the affinity cpu list. + tvm::runtime::threading ::Configure(tvm::runtime::threading::ThreadGroup::kSpecify, 0, + cpus); + } + TVMBackendParallelLaunch(affinity_check_task_id, &ac, 0); + EXPECT_EQ(ac.GetComputeResult(), N * (N - 1) / 2); + EXPECT_EQ(ac.VerifyAffinity(cpus), true); + }, + i, max_concurrency, value)); + } + } + for (auto& t : ts) { + t->join(); + } +}