From a01df59db0a872eaca427c9e2fff9e2ef9fd1a79 Mon Sep 17 00:00:00 2001 From: huajsj Date: Fri, 17 Dec 2021 17:34:03 -0800 Subject: [PATCH 01/11] [Runtime][ThreadPool] Refactor affinity function and support CPU affinity list setting. Issue: 1. There are multiple affinity function using "LINUX" and "ANDROID" macro check and the multiple check make the logic maintain and change become complex. 2. Current logic of tvm [Runtime][ThreadPool] assume all of the cpu resources are available for a single backend runtime to do the data flow computation. But such assumption may not true when user running multiple task on the system and not want tvm task exhaust all of the cpu resource, or when user going to run multiple backend runtime of tvm on the system, each backend runtime of tvm should use different cpu affinity settings to achieve best performance. Solution: 1.Refactor the affinity functions to move the "LINUX" and "ANDROID" check into one function. 2.In this solution, we introduce a new "CPU AffinityMode type" named "kSpecify", by using "kSpecify" and the function named "tvm::runtime::threading ::Configure" user can specify the cpu list for the cpu affinity of a backend runtime. This solution reused the existing per thread thread pool logic of [Runtime][Threadpool] that created a worker thread pool for current thread which can running a particular runtime. for a multiple runtime use case, user can first launch multiple threads, then call "tvm::runtime::threading ::Configure" with cpu list to create tvm data flow worker thread pool, after doing this the execution of the multiple runtime on the multiple threads will use different cpu resource list. --- include/tvm/runtime/threading_backend.h | 31 +++- src/runtime/thread_pool.cc | 27 +++- src/runtime/threading_backend.cc | 199 ++++++++++++++---------- tests/cpp/threading_backend_test.cc | 128 ++++++++++++++- 4 files changed, 290 insertions(+), 95 deletions(-) diff --git a/include/tvm/runtime/threading_backend.h b/include/tvm/runtime/threading_backend.h index 43636ddbdb1f..976a0172be64 100644 --- a/include/tvm/runtime/threading_backend.h +++ b/include/tvm/runtime/threading_backend.h @@ -28,6 +28,27 @@ #include #include +#if defined(__linux__) || defined(__ANDROID__) +#if defined(__ANDROID__) +#ifndef CPU_SET +#define CPU_SETSIZE 1024 +#define __NCPUBITS (8 * sizeof(uint64_t)) +typedef struct { + uint64_t __bits[CPU_SETSIZE / __NCPUBITS]; +} cpu_set_t; + +#define CPU_SET(cpu, cpusetp) \ + ((cpusetp)->__bits[(cpu) / __NCPUBITS] |= (1UL << ((cpu) % __NCPUBITS))) +#define CPU_ZERO(cpusetp) memset((cpusetp), 0, sizeof(cpu_set_t)) +#define CPU_ISSET(cpu, cpusetp) \ + (1UL << ((cpu) % __NCPUBITS)) == \ + ((cpusetp)->__bits[(cpu) / __NCPUBITS] & (1UL << ((cpu) % __NCPUBITS))) +#define CPU_EQUAL(left, right) (memcmp(&left, &right, sizeof(cpu_set_t)) == 0) + +#endif +#endif +#endif + namespace tvm { namespace runtime { namespace threading { @@ -64,6 +85,7 @@ class ThreadGroup { enum AffinityMode : int { kBig = 1, kLittle = -1, + kSpecify = -2, }; /*! @@ -71,6 +93,7 @@ class ThreadGroup { * * \param mode The preferred CPU type (1 = big, -1 = little). * \param nthreads The number of threads to use (0 = use all). + * \param cpus A list of cpu to use for affinity setting. * \param exclude_worker0 Whether to use the main thread as a worker. * If `true`, worker0 will not be launched in a new thread and * `worker_callback` will only be called for values >= 1. This @@ -78,7 +101,8 @@ class ThreadGroup { * * \return The number of workers to use. */ - int Configure(AffinityMode mode, int nthreads, bool exclude_worker0); + int Configure(AffinityMode mode, int nthreads, std::vector cpus, + bool exclude_worker0); private: Impl* impl_; @@ -94,6 +118,8 @@ void Yield(); */ int MaxConcurrency(); +void SetMaxConcurrency(int value); + /*! * \brief Reset the threads in the pool. All current threads are destroyed and * new ones are created. @@ -102,6 +128,9 @@ int MaxConcurrency(); */ void ResetThreadPool(); +void Configure(tvm::runtime::threading::ThreadGroup::AffinityMode mode, int nthreads, + std::vector cpus, int max_concurrency = 0); + } // namespace threading } // namespace runtime } // namespace tvm diff --git a/src/runtime/thread_pool.cc b/src/runtime/thread_pool.cc index c8d1845266e8..10d8865dadb0 100644 --- a/src/runtime/thread_pool.cc +++ b/src/runtime/thread_pool.cc @@ -28,6 +28,7 @@ #include #include #include +#include #if TVM_THREADPOOL_USE_OPENMP #include #endif @@ -317,10 +318,11 @@ class ThreadPool { static ThreadPool* ThreadLocal() { return dmlc::ThreadLocalStore::Get(); } - void UpdateWorkerConfiguration(threading::ThreadGroup::AffinityMode mode, int nthreads) { + void UpdateWorkerConfiguration(threading::ThreadGroup::AffinityMode mode, int nthreads, + const std::vector& cpus) { // this will also reset the affinity of the ThreadGroup // may use less than the MaxConcurrency number of workers - num_workers_used_ = threads_->Configure(mode, nthreads, exclude_worker0_); + num_workers_used_ = threads_->Configure(mode, nthreads, cpus, exclude_worker0_); // if MaxConcurrency restricted the number of workers (e.g., due to // hyperthreading), respect the restriction num_workers_used_ = std::min(num_workers_, num_workers_used_); @@ -337,7 +339,7 @@ class ThreadPool { new tvm::runtime::threading::ThreadGroup( num_workers_, [this](int worker_id) { this->RunWorker(worker_id); }, exclude_worker0_ /* include_main_thread */)); - num_workers_used_ = threads_->Configure(threading::ThreadGroup::kBig, 0, exclude_worker0_); + num_workers_used_ = threads_->Configure(threading::ThreadGroup::kBig, 0, {}, exclude_worker0_); } // Internal worker function. @@ -373,13 +375,28 @@ TVM_REGISTER_GLOBAL("runtime.config_threadpool").set_body([](TVMArgs args, TVMRe threading::ThreadGroup::AffinityMode mode = static_cast(static_cast(args[0])); int nthreads = args[1]; - ThreadPool::ThreadLocal()->UpdateWorkerConfiguration(mode, nthreads); + std::vector cpus; + int max_concurrency = 0; + if (args.num_args == 3) { + Array cpu_array = args[2]; + for (auto cpu : cpu_array) { + cpus.push_back(cpu); + } + } + if (args.num_args == 4) { + max_concurrency = args[3]; + } + threading::Configure(mode, nthreads, cpus, max_concurrency); }); namespace threading { void ResetThreadPool() { tvm::runtime::ThreadPool::ThreadLocal()->Reset(); } +void Configure(tvm::runtime::threading::ThreadGroup::AffinityMode mode, int nthreads, + std::vector cpus, int max_concurrency) { + tvm::runtime::threading::SetMaxConcurrency(max_concurrency); + tvm::runtime::ThreadPool::ThreadLocal()->UpdateWorkerConfiguration(mode, nthreads, cpus); +} } // namespace threading - } // namespace runtime } // namespace tvm diff --git a/src/runtime/threading_backend.cc b/src/runtime/threading_backend.cc index 5b3093ac85cd..3811399cba47 100644 --- a/src/runtime/threading_backend.cc +++ b/src/runtime/threading_backend.cc @@ -24,8 +24,6 @@ #include #include -#include -#include #if defined(__linux__) || defined(__ANDROID__) #include #include @@ -37,11 +35,14 @@ #if defined(__hexagon__) #include #endif +#include +#include +#define CURRENT_THREAD_HANDLE ((pthread_t)-1) namespace tvm { namespace runtime { namespace threading { - +thread_local int max_concurrency_value = 0; class ThreadGroup::Impl { public: Impl(int num_workers, std::function worker_callback, bool exclude_worker0) @@ -60,12 +61,17 @@ class ThreadGroup::Impl { } } - int Configure(AffinityMode mode, int nthreads, bool exclude_worker0) { + int Configure(AffinityMode mode, int nthreads, std::vector cpus, + bool exclude_worker0) { int num_workers_used = 0; if (mode == kLittle) { num_workers_used = little_count_; } else if (mode == kBig) { num_workers_used = big_count_; + } else if (mode == kSpecify) { + num_workers_used = cpus.size(); + // Set the cpu list. + UpdateSortedOrder(cpus); } else { // use default num_workers_used = threading::MaxConcurrency(); @@ -79,71 +85,76 @@ class ThreadGroup::Impl { // and N/2 physical cores this will set affinity to the first N/2 logical // ones. num_workers_used = std::min(num_workers_, num_workers_used); - - const char* val = getenv("TVM_BIND_THREADS"); - if (val == nullptr || atoi(val) == 1) { - // Do not set affinity if there are more workers than found cores - if (sorted_order_.size() >= static_cast(num_workers_)) { - SetAffinity(exclude_worker0, mode == kLittle); - } else { - LOG(WARNING) << "The thread affinity cannot be set when the number of workers" - << "is larger than the number of available cores in the system."; - } - } + SetAffinity(exclude_worker0, mode); return num_workers_used; } private: - // bind worker threads to disjoint cores - // if worker 0 is offloaded to main, i.e. exclude_worker0 is true, - // the main thread is bound to core 0. - void SetAffinity(bool exclude_worker0, bool reverse = false) { + void SetThreadAffinity(std::thread::native_handle_type thread, + const std::vector& ids) { +#if defined(__linux__) || defined(__ANDROID__) + if (pthread_equal(thread, CURRENT_THREAD_HANDLE)) { + thread = pthread_self(); + } + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + for (auto id : ids) { + CPU_SET(id, &cpuset); + } #if defined(__ANDROID__) -#ifndef CPU_SET -#define CPU_SETSIZE 1024 -#define __NCPUBITS (8 * sizeof(uint64_t)) - typedef struct { - uint64_t __bits[CPU_SETSIZE / __NCPUBITS]; - } cpu_set_t; - -#define CPU_SET(cpu, cpusetp) \ - ((cpusetp)->__bits[(cpu) / __NCPUBITS] |= (1UL << ((cpu) % __NCPUBITS))) -#define CPU_ZERO(cpusetp) memset((cpusetp), 0, sizeof(cpu_set_t)) + sched_setaffinity(thread, sizeof(cpu_set_t), &cpuset); +#else + pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset); #endif #endif -#if defined(__linux__) || defined(__ANDROID__) - ICHECK_GE(sorted_order_.size(), num_workers_); + } - for (unsigned i = 0; i < threads_.size(); ++i) { - unsigned core_id; - if (reverse) { - core_id = sorted_order_[sorted_order_.size() - (i + exclude_worker0) - 1]; + // bind worker threads to disjoint cores + // if worker 0 is offloaded to main, i.e. exclude_worker0 is true, + // the main thread is bound to core 0. + void SetAffinity(bool exclude_worker0, AffinityMode mode) { + const char* val = getenv("TVM_BIND_THREADS"); + if (val != nullptr && atoi(val) != 1) { + return; + } + // Do not set affinity if there are more workers than found cores and mode is not kSpecify. + if (sorted_order_.size() < static_cast(num_workers_)) { + if (mode == kSpecify) { + // if give a list of cpus and set mode as kSpecify need to restrict the threads + // on the said cpu list + for (unsigned i = 0; i < threads_.size(); ++i) { + SetThreadFullCpuAffinity(threads_[i].native_handle(), mode); + } + if (exclude_worker0) { // main thread run task + SetMasterThreadFullCpuAffinity(mode); + } } else { - core_id = sorted_order_[i + exclude_worker0]; + LOG(WARNING) << "The thread affinity cannot be set when the number of workers" + << "is larger than the number of available cores in the system."; + } + } else { + ICHECK_GE(sorted_order_.size(), num_workers_); + for (unsigned i = 0; i < threads_.size(); ++i) { + bool reverse = mode == kLittle; + unsigned core_id; + if (reverse) { + core_id = sorted_order_[sorted_order_.size() - (i + exclude_worker0) - 1]; + } else { + core_id = sorted_order_[i + exclude_worker0]; + } + SetThreadAffinity(threads_[i].native_handle(), {core_id}); + } + if (exclude_worker0) { // main thread run task + // Master thread will have free migration on needed cores. + // Typically, the OS will schedule the main thread to run at core 0, + // which is idle, when other workers are running. + // See the comment inside SetMasterThreadFullCpuAffinity function to get more detail. + SetMasterThreadFullCpuAffinity(mode); } - cpu_set_t cpuset; - CPU_ZERO(&cpuset); - CPU_SET(core_id, &cpuset); -#if defined(__ANDROID__) - sched_setaffinity(threads_[i].native_handle(), sizeof(cpu_set_t), &cpuset); -#else - pthread_setaffinity_np(threads_[i].native_handle(), sizeof(cpu_set_t), &cpuset); -#endif - } - if (exclude_worker0) { // main thread run task - // Master thread will have free migration on needed cores. - // Typically, the OS will schedule the main thread to run at core 0, - // which is idle, when other workers are running. - // See the comment inside SetMasterThreadFullCpuAffinity function to get more detail. - SetMasterThreadFullCpuAffinity(reverse); } -#endif } - void SetMasterThreadFullCpuAffinity(bool reverse) { -#if defined(__linux__) || defined(__ANDROID__) - cpu_set_t cpuset; - CPU_ZERO(&cpuset); + void SetThreadFullCpuAffinity(std::thread::native_handle_type thread, AffinityMode mode) { // For example, we have 2xA72 + 4xA53 (id is 0 - 5, 4, 5 is A72 big core) // And we use config_threadpool API to set we will only use 4xA53. // The sorted_order will be [4, 5, 0, 1, 2, 3]. @@ -154,22 +165,31 @@ class ThreadGroup::Impl { // Note: this works well on x86 too. Because x86 doesn't have BIG.LITTLE, // our implementation will use kBig mode by default and will let main thread // run on intended cores. - if (reverse) { + std::vector ids; + if (mode == kSpecify) { + for (size_t i = 0; i < sorted_order_.size(); ++i) { + ids.push_back(sorted_order_[i]); + } + } else if (mode == kLittle) { for (int i = 0; i < little_count_; ++i) { - CPU_SET(sorted_order_[sorted_order_.size() - i - 1], &cpuset); + ids.push_back(sorted_order_[sorted_order_.size() - i - 1]); } } else { int num_cpu_workers = std::min(MaxConcurrency(), big_count_); for (int i = 0; i < num_cpu_workers; ++i) { - CPU_SET(sorted_order_[i], &cpuset); + ids.push_back(sorted_order_[i]); } } -#if defined(__ANDROID__) - sched_setaffinity(pthread_self(), sizeof(cpu_set_t), &cpuset); -#else - pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); -#endif -#endif + SetThreadAffinity(thread, ids); + } + + void SetMasterThreadFullCpuAffinity(AffinityMode mode) { + SetThreadFullCpuAffinity(CURRENT_THREAD_HANDLE, mode); + } + + void UpdateSortedOrder(std::vector cpus) { + sorted_order_ = cpus; + return; } void InitSortedOrder() { @@ -231,36 +251,45 @@ ThreadGroup::ThreadGroup(int num_workers, std::function worker_callba ThreadGroup::~ThreadGroup() { delete impl_; } void ThreadGroup::Join() { impl_->Join(); } -int ThreadGroup::Configure(AffinityMode mode, int nthreads, bool exclude_worker0) { - return impl_->Configure(mode, nthreads, exclude_worker0); +int ThreadGroup::Configure(AffinityMode mode, int nthreads, std::vector cpus, + bool exclude_worker0) { + return impl_->Configure(mode, nthreads, cpus, exclude_worker0); } void Yield() { std::this_thread::yield(); } - +void SetMaxConcurrency(int value) { + if (value > 0) { + max_concurrency_value = value; + } +} int MaxConcurrency() { int max_concurrency = 1; - const char* val = getenv("TVM_NUM_THREADS"); - if (val == nullptr) { - val = getenv("OMP_NUM_THREADS"); - } - if (val != nullptr) { - max_concurrency = atoi(val); + if (tvm::runtime::threading::max_concurrency_value != 0) { + max_concurrency = tvm::runtime::threading::max_concurrency_value; } else { - max_concurrency = std::thread::hardware_concurrency(); + const char* val = getenv("TVM_NUM_THREADS"); + if (val == nullptr) { + val = getenv("OMP_NUM_THREADS"); + } + if (val != nullptr) { + max_concurrency = atoi(val); + } else { + max_concurrency = std::thread::hardware_concurrency(); #if defined(_M_X64) || defined(__x86_64__) - max_concurrency /= 2; // ignore hyper-threading + max_concurrency /= 2; // ignore hyper-threading #elif defined(__hexagon__) - // With unsigned PDs, getting the number of available hardware threads - // is not supported in earlier versions of QuRT. In such cases assume 4. - // If running on simulator, set max_concurrency to 1. - if (max_concurrency == 0) { - if (dlsym(RTLD_DEFAULT, "running_in_sim_dev_17bc90206f6cf5a7")) { - max_concurrency = 1; - } else { - max_concurrency = 4; + // With unsigned PDs, getting the number of available hardware threads + // is not supported in earlier versions of QuRT. In such cases assume 4. + // If running on simulator, set max_concurrency to 1. + if (max_concurrency == 0) { + if (dlsym(RTLD_DEFAULT, "running_in_sim_dev_17bc90206f6cf5a7")) { + max_concurrency = 1; + } else { + max_concurrency = 4; + } } - } #endif + } } return std::max(max_concurrency, 1); } diff --git a/tests/cpp/threading_backend_test.cc b/tests/cpp/threading_backend_test.cc index 948838971796..d697c7dc3075 100644 --- a/tests/cpp/threading_backend_test.cc +++ b/tests/cpp/threading_backend_test.cc @@ -17,22 +17,102 @@ * under the License. */ +#include #include #include +#include #include #include +#include #include +#include +#include constexpr size_t N = 128; +void AtomicCompute(int task_id, size_t n, std::atomic* acc, TVMParallelGroupEnv* penv) { + const size_t N_per_task = (n + penv->num_task - 1) / penv->num_task; + for (size_t i = task_id * N_per_task; i < n && i < (task_id + 1) * N_per_task; ++i) { + acc->fetch_add(i, std::memory_order_relaxed); + } + return; +} + +class AffinityCheck { + public: + AffinityCheck(uint32_t parent_id, int max_concurrency, std::atomic* acc) + : id_(parent_id), max_concurrency_(max_concurrency), acc_(acc) {} + + void Compute(int task_id, size_t n, TVMParallelGroupEnv* penv) { + AtomicCompute(task_id, n, acc_, penv); + } + + int GetComputeResult() { return acc_->load(std::memory_order_relaxed); } + + void GetAffinity(int task_id) { +#if defined(__linux__) + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + pthread_getaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); + std::lock_guard lock(mutex_); + thread_affinity_[task_id] = cpuset; + // Print the affinity information of current thread into log. + std::ostringstream str; + for (int i = 0; i < max_concurrency_; i++) { + if (CPU_ISSET(i, &cpuset)) { + str << i << ","; + } + } + LOG(INFO) << "id:" << id_ << " taskid:" << task_id << " affinity:" << str.str() << std::endl; +#endif + } + + bool VerifyAffinity(const std::vector& cpus) { +#if defined(__linux__) + std::unordered_set uset; + cpu_set_t cpu_mask; + CPU_ZERO(&cpu_mask); + for (auto x : cpus) { + CPU_SET(x, &cpu_mask); + uset.insert(x); + } + + for (auto x : thread_affinity_) { + if (!CPU_EQUAL(&cpu_mask, &x.second)) { + bool cpu_find = false; + for (auto cpu : uset) { + CPU_ISSET(cpu, &x.second); + uset.erase(cpu); + cpu_find = true; + break; + } + if (!cpu_find) return false; + } + } +#endif + return true; + } + + private: + uint32_t id_; + int max_concurrency_; + std::atomic* acc_; + std::mutex mutex_; + std::unordered_map thread_affinity_; +}; static FTVMParallelLambda atomic_add_task_id = [](int task_id, TVMParallelGroupEnv* penv, void* cdata) -> int { auto* data = reinterpret_cast*>(cdata); - const size_t N_per_task = (N + penv->num_task - 1) / penv->num_task; - for (size_t i = task_id * N_per_task; i < N && i < (task_id + 1) * N_per_task; ++i) { - data->fetch_add(i, std::memory_order_relaxed); - } + AtomicCompute(task_id, N, data, penv); + return 0; +}; + +static FTVMParallelLambda affinity_check_task_id = [](int task_id, TVMParallelGroupEnv* penv, + void* cdata) -> int { + auto* data = reinterpret_cast(cdata); + data->Compute(task_id, N, penv); + data->GetAffinity(task_id); return 0; }; @@ -63,3 +143,43 @@ TEST(ThreadingBackend, TVMBackendParallelLaunchMultipleThreads) { } } } + +TEST(ThreadingBackend, TVMBackendAffinityConfigure) { + int max_concurrency = tvm::runtime::threading::MaxConcurrency(); + std::vector> ts; + if (max_concurrency <= 1) { + return; + } + const int thread_pool_num = max_concurrency > 1 ? 2 : 1; + const int cpus_num_per_pool = max_concurrency / thread_pool_num; + std::vector concurrency = {0, 3}; + for (auto value : concurrency) { + for (int i = 0; i < thread_pool_num; i++) { + ts.emplace_back(new std::thread( + [&](int j, int sys_max_concurrency, int concurrency_config) { + std::atomic acc(0); + AffinityCheck ac(j, sys_max_concurrency, &acc); + std::vector cpus; + for (int k = 0; k < cpus_num_per_pool; k++) { + cpus.push_back(j * cpus_num_per_pool + k); + } + if (concurrency_config != 0) { + // Setting max concurrency number as 3 as well as setting the affinity cpu list. + tvm::runtime::threading ::Configure(tvm::runtime::threading::ThreadGroup::kSpecify, 0, + cpus, concurrency_config); + } else { + // Setting max concurrency as no limitation as well as setting the affinity cpu list. + tvm::runtime::threading ::Configure(tvm::runtime::threading::ThreadGroup::kSpecify, 0, + cpus); + } + TVMBackendParallelLaunch(affinity_check_task_id, &ac, 0); + EXPECT_EQ(ac.GetComputeResult(), N * (N - 1) / 2); + EXPECT_EQ(ac.VerifyAffinity(cpus), true); + }, + i, max_concurrency, value)); + } + } + for (auto& t : ts) { + t->join(); + } +} From 1b457e97483521bf035d9c12cf02b0416603e619 Mon Sep 17 00:00:00 2001 From: hua jiang Date: Mon, 27 Dec 2021 19:41:04 -0800 Subject: [PATCH 02/11] fix windows build issue. --- src/runtime/threading_backend.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/runtime/threading_backend.cc b/src/runtime/threading_backend.cc index 3811399cba47..ff43d80d143c 100644 --- a/src/runtime/threading_backend.cc +++ b/src/runtime/threading_backend.cc @@ -37,7 +37,7 @@ #endif #include #include -#define CURRENT_THREAD_HANDLE ((pthread_t)-1) +#define CURRENT_THREAD_HANDLE (reinterpret_cast(nullptr)) namespace tvm { namespace runtime { From 0c8d189acd0fad5a16c3e5f87f7ff3e7d9bfd16a Mon Sep 17 00:00:00 2001 From: hua jiang Date: Mon, 27 Dec 2021 19:41:04 -0800 Subject: [PATCH 03/11] fix build issue. --- src/runtime/threading_backend.cc | 2 +- tests/cpp/threading_backend_test.cc | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/runtime/threading_backend.cc b/src/runtime/threading_backend.cc index ff43d80d143c..8b4c54edcbaf 100644 --- a/src/runtime/threading_backend.cc +++ b/src/runtime/threading_backend.cc @@ -37,7 +37,7 @@ #endif #include #include -#define CURRENT_THREAD_HANDLE (reinterpret_cast(nullptr)) +#define CURRENT_THREAD_HANDLE (static_cast(nullptr)) namespace tvm { namespace runtime { diff --git a/tests/cpp/threading_backend_test.cc b/tests/cpp/threading_backend_test.cc index d697c7dc3075..0d8a47494385 100644 --- a/tests/cpp/threading_backend_test.cc +++ b/tests/cpp/threading_backend_test.cc @@ -98,7 +98,9 @@ class AffinityCheck { int max_concurrency_; std::atomic* acc_; std::mutex mutex_; +#if defined(__linux__) std::unordered_map thread_affinity_; +#endif }; static FTVMParallelLambda atomic_add_task_id = [](int task_id, TVMParallelGroupEnv* penv, From d8ec871ff23152f3a68622d61b2c88e19bc5e627 Mon Sep 17 00:00:00 2001 From: hua jiang Date: Thu, 30 Dec 2021 23:59:43 -0800 Subject: [PATCH 04/11] fix build issue. --- src/runtime/threading_backend.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/runtime/threading_backend.cc b/src/runtime/threading_backend.cc index 8b4c54edcbaf..21ebba8a1f8c 100644 --- a/src/runtime/threading_backend.cc +++ b/src/runtime/threading_backend.cc @@ -37,7 +37,7 @@ #endif #include #include -#define CURRENT_THREAD_HANDLE (static_cast(nullptr)) +#define CURRENT_THREAD_HANDLE (static_cast(0)) namespace tvm { namespace runtime { From 7922d6e70ce84a213176585319ce68a4a221317b Mon Sep 17 00:00:00 2001 From: hua jiang Date: Thu, 6 Jan 2022 14:46:13 -0800 Subject: [PATCH 05/11] fix windows build issue. --- src/runtime/thread_pool.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/runtime/thread_pool.cc b/src/runtime/thread_pool.cc index 10d8865dadb0..7b8cb01f37b4 100644 --- a/src/runtime/thread_pool.cc +++ b/src/runtime/thread_pool.cc @@ -28,7 +28,7 @@ #include #include #include -#include +#include #if TVM_THREADPOOL_USE_OPENMP #include #endif @@ -378,9 +378,9 @@ TVM_REGISTER_GLOBAL("runtime.config_threadpool").set_body([](TVMArgs args, TVMRe std::vector cpus; int max_concurrency = 0; if (args.num_args == 3) { - Array cpu_array = args[2]; + Array cpu_array = args[2]; for (auto cpu : cpu_array) { - cpus.push_back(cpu); + cpus.push_back(std::stoi(cpu)); } } if (args.num_args == 4) { From b3cdd2a91bc7885bc7c3be92d736877e0c5f9b91 Mon Sep 17 00:00:00 2001 From: hua jiang Date: Thu, 6 Jan 2022 15:19:04 -0800 Subject: [PATCH 06/11] fix plint issue --- src/runtime/thread_pool.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/runtime/thread_pool.cc b/src/runtime/thread_pool.cc index 7b8cb01f37b4..94a04d8a5830 100644 --- a/src/runtime/thread_pool.cc +++ b/src/runtime/thread_pool.cc @@ -24,11 +24,11 @@ #include #include #include +#include #include #include #include #include -#include #if TVM_THREADPOOL_USE_OPENMP #include #endif From 937aa77c655c22e5d881e7365649989cb0cd1e9f Mon Sep 17 00:00:00 2001 From: huajsj Date: Mon, 31 Jan 2022 21:27:39 -0800 Subject: [PATCH 07/11] polish comments. --- include/tvm/runtime/threading_backend.h | 2 +- src/runtime/threading_backend.cc | 8 +------ tests/cpp/threading_backend_test.cc | 31 ++++++++++++++----------- 3 files changed, 20 insertions(+), 21 deletions(-) diff --git a/include/tvm/runtime/threading_backend.h b/include/tvm/runtime/threading_backend.h index 976a0172be64..6b8238a2e457 100644 --- a/include/tvm/runtime/threading_backend.h +++ b/include/tvm/runtime/threading_backend.h @@ -93,7 +93,7 @@ class ThreadGroup { * * \param mode The preferred CPU type (1 = big, -1 = little). * \param nthreads The number of threads to use (0 = use all). - * \param cpus A list of cpu to use for affinity setting. + * \param cpus A list of CPU used to set 'cpu affinity'. * \param exclude_worker0 Whether to use the main thread as a worker. * If `true`, worker0 will not be launched in a new thread and * `worker_callback` will only be called for values >= 1. This diff --git a/src/runtime/threading_backend.cc b/src/runtime/threading_backend.cc index 21ebba8a1f8c..4acea773a1a2 100644 --- a/src/runtime/threading_backend.cc +++ b/src/runtime/threading_backend.cc @@ -70,8 +70,7 @@ class ThreadGroup::Impl { num_workers_used = big_count_; } else if (mode == kSpecify) { num_workers_used = cpus.size(); - // Set the cpu list. - UpdateSortedOrder(cpus); + sorted_order_ = cpus; } else { // use default num_workers_used = threading::MaxConcurrency(); @@ -187,11 +186,6 @@ class ThreadGroup::Impl { SetThreadFullCpuAffinity(CURRENT_THREAD_HANDLE, mode); } - void UpdateSortedOrder(std::vector cpus) { - sorted_order_ = cpus; - return; - } - void InitSortedOrder() { unsigned int threads = std::thread::hardware_concurrency(); #if defined(__hexagon__) diff --git a/tests/cpp/threading_backend_test.cc b/tests/cpp/threading_backend_test.cc index 0d8a47494385..ec2decae66e8 100644 --- a/tests/cpp/threading_backend_test.cc +++ b/tests/cpp/threading_backend_test.cc @@ -56,7 +56,7 @@ class AffinityCheck { pthread_getaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); std::lock_guard lock(mutex_); thread_affinity_[task_id] = cpuset; - // Print the affinity information of current thread into log. + // Printing the current thread CPU affinity. std::ostringstream str; for (int i = 0; i < max_concurrency_; i++) { if (CPU_ISSET(i, &cpuset)) { @@ -149,28 +149,33 @@ TEST(ThreadingBackend, TVMBackendParallelLaunchMultipleThreads) { TEST(ThreadingBackend, TVMBackendAffinityConfigure) { int max_concurrency = tvm::runtime::threading::MaxConcurrency(); std::vector> ts; + // Returning as there is only one CPU available. if (max_concurrency <= 1) { return; } - const int thread_pool_num = max_concurrency > 1 ? 2 : 1; - const int cpus_num_per_pool = max_concurrency / thread_pool_num; + // Creating two threads to test the 'CPU list affinity' feature. + const int threads_num = 2; + // Getting the maximum number of CPU which is available to each thread. + const int cpus_num_per_thread = max_concurrency / threads_num; + // Testing two scenario of concurrency. The '0' means there is no concurrency limitaion, + // The '3' means the value of maximium concurrency is '3'. std::vector concurrency = {0, 3}; - for (auto value : concurrency) { - for (int i = 0; i < thread_pool_num; i++) { + for (auto concurrency_value : concurrency) { + for (int thread_pool_idx = 0; thread_pool_idx < threads_num; thread_pool_idx++) { ts.emplace_back(new std::thread( - [&](int j, int sys_max_concurrency, int concurrency_config) { + [&](int thread_pool_index, int sys_max_concurrency, int concurrency_config) { std::atomic acc(0); - AffinityCheck ac(j, sys_max_concurrency, &acc); + AffinityCheck ac(thread_pool_index, sys_max_concurrency, &acc); std::vector cpus; - for (int k = 0; k < cpus_num_per_pool; k++) { - cpus.push_back(j * cpus_num_per_pool + k); + for (int k = 0; k < cpus_num_per_thread; k++) { + cpus.push_back(thread_pool_index * cpus_num_per_thread + k); } - if (concurrency_config != 0) { - // Setting max concurrency number as 3 as well as setting the affinity cpu list. + if (concurrency_config > 0) { + // Testing the 'Configure' functin with the 'max_concurrency' parameter. tvm::runtime::threading ::Configure(tvm::runtime::threading::ThreadGroup::kSpecify, 0, cpus, concurrency_config); } else { - // Setting max concurrency as no limitation as well as setting the affinity cpu list. + // Testing the 'Configure' function without the 'max_concurrency' parameter. tvm::runtime::threading ::Configure(tvm::runtime::threading::ThreadGroup::kSpecify, 0, cpus); } @@ -178,7 +183,7 @@ TEST(ThreadingBackend, TVMBackendAffinityConfigure) { EXPECT_EQ(ac.GetComputeResult(), N * (N - 1) / 2); EXPECT_EQ(ac.VerifyAffinity(cpus), true); }, - i, max_concurrency, value)); + thread_pool_idx, max_concurrency, concurrency_value)); } } for (auto& t : ts) { From ada242cc62f8cc7e2795fd0ddf77a3d3144de203 Mon Sep 17 00:00:00 2001 From: huajsj Date: Wed, 16 Feb 2022 23:31:18 -0800 Subject: [PATCH 08/11] address review comments. --- include/tvm/runtime/threading_backend.h | 6 +++--- src/runtime/thread_pool.cc | 13 ++++++++++--- src/runtime/threading_backend.cc | 18 +++++++++--------- 3 files changed, 22 insertions(+), 15 deletions(-) diff --git a/include/tvm/runtime/threading_backend.h b/include/tvm/runtime/threading_backend.h index 6b8238a2e457..3e583f864f20 100644 --- a/include/tvm/runtime/threading_backend.h +++ b/include/tvm/runtime/threading_backend.h @@ -93,16 +93,16 @@ class ThreadGroup { * * \param mode The preferred CPU type (1 = big, -1 = little). * \param nthreads The number of threads to use (0 = use all). - * \param cpus A list of CPU used to set 'cpu affinity'. * \param exclude_worker0 Whether to use the main thread as a worker. * If `true`, worker0 will not be launched in a new thread and * `worker_callback` will only be called for values >= 1. This * allows use of the main thread as a worker. + * \param cpus A list of CPU used to set 'cpu affinity'. * * \return The number of workers to use. */ - int Configure(AffinityMode mode, int nthreads, std::vector cpus, - bool exclude_worker0); + int Configure(AffinityMode mode, int nthreads, bool exclude_worker0, + std::vector cpus = {}); private: Impl* impl_; diff --git a/src/runtime/thread_pool.cc b/src/runtime/thread_pool.cc index 94a04d8a5830..70ef3e7bacd7 100644 --- a/src/runtime/thread_pool.cc +++ b/src/runtime/thread_pool.cc @@ -43,12 +43,13 @@ #include #include +#include "../support/utils.h" const constexpr int kL1CacheBytes = 64; namespace tvm { namespace runtime { namespace { - +using support::IsNumber; constexpr uint32_t kDefaultSpinCount = 300000; uint32_t GetSpinCount() { @@ -322,7 +323,7 @@ class ThreadPool { const std::vector& cpus) { // this will also reset the affinity of the ThreadGroup // may use less than the MaxConcurrency number of workers - num_workers_used_ = threads_->Configure(mode, nthreads, cpus, exclude_worker0_); + num_workers_used_ = threads_->Configure(mode, nthreads, exclude_worker0_, cpus); // if MaxConcurrency restricted the number of workers (e.g., due to // hyperthreading), respect the restriction num_workers_used_ = std::min(num_workers_, num_workers_used_); @@ -339,7 +340,7 @@ class ThreadPool { new tvm::runtime::threading::ThreadGroup( num_workers_, [this](int worker_id) { this->RunWorker(worker_id); }, exclude_worker0_ /* include_main_thread */)); - num_workers_used_ = threads_->Configure(threading::ThreadGroup::kBig, 0, {}, exclude_worker0_); + num_workers_used_ = threads_->Configure(threading::ThreadGroup::kBig, 0, exclude_worker0_); } // Internal worker function. @@ -371,6 +372,11 @@ class ThreadPool { std::unique_ptr threads_; }; +/*! + * \brief args[0] is the AffinityMode, args[1] is the number of threads. + * args[2](optional) is a list id of CPU which is used to set CPU affinity. + * args[3](optional) is the maximum numbers of worker threads which is create to run the task. + */ TVM_REGISTER_GLOBAL("runtime.config_threadpool").set_body([](TVMArgs args, TVMRetValue* rv) { threading::ThreadGroup::AffinityMode mode = static_cast(static_cast(args[0])); @@ -380,6 +386,7 @@ TVM_REGISTER_GLOBAL("runtime.config_threadpool").set_body([](TVMArgs args, TVMRe if (args.num_args == 3) { Array cpu_array = args[2]; for (auto cpu : cpu_array) { + ICHECK(IsNumber(cpu)) << "The CPU core information '" << cpu << "' is not a number."; cpus.push_back(std::stoi(cpu)); } } diff --git a/src/runtime/threading_backend.cc b/src/runtime/threading_backend.cc index 4acea773a1a2..709a54de0216 100644 --- a/src/runtime/threading_backend.cc +++ b/src/runtime/threading_backend.cc @@ -42,7 +42,7 @@ namespace tvm { namespace runtime { namespace threading { -thread_local int max_concurrency_value = 0; +thread_local int max_concurrency = 0; class ThreadGroup::Impl { public: Impl(int num_workers, std::function worker_callback, bool exclude_worker0) @@ -61,8 +61,8 @@ class ThreadGroup::Impl { } } - int Configure(AffinityMode mode, int nthreads, std::vector cpus, - bool exclude_worker0) { + int Configure(AffinityMode mode, int nthreads, bool exclude_worker0, + std::vector cpus) { int num_workers_used = 0; if (mode == kLittle) { num_workers_used = little_count_; @@ -245,21 +245,21 @@ ThreadGroup::ThreadGroup(int num_workers, std::function worker_callba ThreadGroup::~ThreadGroup() { delete impl_; } void ThreadGroup::Join() { impl_->Join(); } -int ThreadGroup::Configure(AffinityMode mode, int nthreads, std::vector cpus, - bool exclude_worker0) { - return impl_->Configure(mode, nthreads, cpus, exclude_worker0); +int ThreadGroup::Configure(AffinityMode mode, int nthreads, bool exclude_worker0, + std::vector cpus) { + return impl_->Configure(mode, nthreads, exclude_worker0, cpus); } void Yield() { std::this_thread::yield(); } void SetMaxConcurrency(int value) { if (value > 0) { - max_concurrency_value = value; + max_concurrency = value; } } int MaxConcurrency() { int max_concurrency = 1; - if (tvm::runtime::threading::max_concurrency_value != 0) { - max_concurrency = tvm::runtime::threading::max_concurrency_value; + if (tvm::runtime::threading::max_concurrency != 0) { + max_concurrency = tvm::runtime::threading::max_concurrency; } else { const char* val = getenv("TVM_NUM_THREADS"); if (val == nullptr) { From 13fe14772a09bce3a6254968f8de463c60b3cc5e Mon Sep 17 00:00:00 2001 From: huajsj Date: Thu, 24 Feb 2022 15:10:45 -0800 Subject: [PATCH 09/11] address reivew comments. --- include/tvm/runtime/threading_backend.h | 22 ++++-- src/runtime/thread_pool.cc | 16 +++-- src/runtime/threading_backend.cc | 93 +++++++++++++++---------- 3 files changed, 84 insertions(+), 47 deletions(-) diff --git a/include/tvm/runtime/threading_backend.h b/include/tvm/runtime/threading_backend.h index 3e583f864f20..4c8815e25d59 100644 --- a/include/tvm/runtime/threading_backend.h +++ b/include/tvm/runtime/threading_backend.h @@ -85,13 +85,15 @@ class ThreadGroup { enum AffinityMode : int { kBig = 1, kLittle = -1, - kSpecify = -2, + /*Different thread will get different affinity.*/ + kSpecifyPerCorePerThread = -2, + /*All threads get same core group affinity.*/ + kSepcifyAllThreadAllCore = -3, }; - /*! * \brief configure the CPU id affinity * - * \param mode The preferred CPU type (1 = big, -1 = little). + * \param mode The preferred CPU type (1 = big, -1 = little, -2 = specify). * \param nthreads The number of threads to use (0 = use all). * \param exclude_worker0 Whether to use the main thread as a worker. * If `true`, worker0 will not be launched in a new thread and @@ -112,14 +114,14 @@ class ThreadGroup { * \brief Platform-agnostic no-op. */ void Yield(); - /*! * \return the maximum number of effective workers for this system. */ int MaxConcurrency(); - +/*! + * \bief Set the maximum number of available cores. + */ void SetMaxConcurrency(int value); - /*! * \brief Reset the threads in the pool. All current threads are destroyed and * new ones are created. @@ -128,8 +130,14 @@ void SetMaxConcurrency(int value); */ void ResetThreadPool(); +/*! + * \brief configure the CPU id affinity + * \param mode The preferred CPU type (1 = big, -1 = little, -2 = specify). + * \param nthreads The number of threads to use (0 = use all). + * \param cpus A list of CPU used to set 'cpu affinity'. + */ void Configure(tvm::runtime::threading::ThreadGroup::AffinityMode mode, int nthreads, - std::vector cpus, int max_concurrency = 0); + std::vector cpus); } // namespace threading } // namespace runtime diff --git a/src/runtime/thread_pool.cc b/src/runtime/thread_pool.cc index 70ef3e7bacd7..90c2cbe8d6d7 100644 --- a/src/runtime/thread_pool.cc +++ b/src/runtime/thread_pool.cc @@ -375,22 +375,21 @@ class ThreadPool { /*! * \brief args[0] is the AffinityMode, args[1] is the number of threads. * args[2](optional) is a list id of CPU which is used to set CPU affinity. - * args[3](optional) is the maximum numbers of worker threads which is create to run the task. */ TVM_REGISTER_GLOBAL("runtime.config_threadpool").set_body([](TVMArgs args, TVMRetValue* rv) { threading::ThreadGroup::AffinityMode mode = static_cast(static_cast(args[0])); int nthreads = args[1]; - std::vector cpus; + // The maximum number of available cores. int max_concurrency = 0; - if (args.num_args == 3) { + std::vector cpus; + if (args.num_args >= 3) { Array cpu_array = args[2]; for (auto cpu : cpu_array) { ICHECK(IsNumber(cpu)) << "The CPU core information '" << cpu << "' is not a number."; cpus.push_back(std::stoi(cpu)); + std::cout << "cpu is " << cpu << std::endl; } - } - if (args.num_args == 4) { max_concurrency = args[3]; } threading::Configure(mode, nthreads, cpus, max_concurrency); @@ -398,6 +397,13 @@ TVM_REGISTER_GLOBAL("runtime.config_threadpool").set_body([](TVMArgs args, TVMRe namespace threading { void ResetThreadPool() { tvm::runtime::ThreadPool::ThreadLocal()->Reset(); } +/*! + * \brief configure the CPU id affinity + * \param mode The preferred CPU type (1 = big, -1 = little, -2 = specify). + * \param nthreads The number of threads to use (0 = use all). + * \param cpus A list of CPU used to set 'cpu affinity'. + * + */ void Configure(tvm::runtime::threading::ThreadGroup::AffinityMode mode, int nthreads, std::vector cpus, int max_concurrency) { tvm::runtime::threading::SetMaxConcurrency(max_concurrency); diff --git a/src/runtime/threading_backend.cc b/src/runtime/threading_backend.cc index 709a54de0216..20784aa5cbfa 100644 --- a/src/runtime/threading_backend.cc +++ b/src/runtime/threading_backend.cc @@ -38,7 +38,6 @@ #include #include #define CURRENT_THREAD_HANDLE (static_cast(0)) - namespace tvm { namespace runtime { namespace threading { @@ -118,30 +117,46 @@ class ThreadGroup::Impl { } // Do not set affinity if there are more workers than found cores and mode is not kSpecify. if (sorted_order_.size() < static_cast(num_workers_)) { - if (mode == kSpecify) { - // if give a list of cpus and set mode as kSpecify need to restrict the threads - // on the said cpu list - for (unsigned i = 0; i < threads_.size(); ++i) { - SetThreadFullCpuAffinity(threads_[i].native_handle(), mode); - } - if (exclude_worker0) { // main thread run task - SetMasterThreadFullCpuAffinity(mode); - } - } else { - LOG(WARNING) << "The thread affinity cannot be set when the number of workers" - << "is larger than the number of available cores in the system."; + switch (mode) { + case kSpecifyPerCorePerThread: + case kSepcifyAllThreadAllCore: + // if give a list of cpus and set mode as kSpecify need to restrict the threads + // on the said cpu list + for (unsigned i = 0; i < threads_.size(); ++i) { + SetThreadFullCpuAffinity(threads_[i].native_handle(), mode); + } + if (exclude_worker0) { // main thread run task + SetMasterThreadFullCpuAffinity(mode); + } + break; + case kLittle: + case kBig: + LOG(WARNING) << "The thread affinity cannot be set when the number of workers" + << "is larger than the number of available cores in the system."; + break; } } else { ICHECK_GE(sorted_order_.size(), num_workers_); - for (unsigned i = 0; i < threads_.size(); ++i) { - bool reverse = mode == kLittle; - unsigned core_id; - if (reverse) { - core_id = sorted_order_[sorted_order_.size() - (i + exclude_worker0) - 1]; - } else { - core_id = sorted_order_[i + exclude_worker0]; - } - SetThreadAffinity(threads_[i].native_handle(), {core_id}); + switch (mode) { + case kSepcifyAllThreadAllCore: + for (unsigned i = 0; i < threads_.size(); ++i) { + SetThreadFullCpuAffinity(threads_[i].native_handle(), mode); + } + break; + case kLittle: + case kBig: + case kSpecifyPerCorePerThread: + for (unsigned i = 0; i < threads_.size(); ++i) { + bool reverse = mode == kLittle; + unsigned core_id; + if (reverse) { + core_id = sorted_order_[sorted_order_.size() - (i + exclude_worker0) - 1]; + } else { + core_id = sorted_order_[i + exclude_worker0]; + } + SetThreadAffinity(threads_[i].native_handle(), {core_id}); + } + break; } if (exclude_worker0) { // main thread run task // Master thread will have free migration on needed cores. @@ -165,19 +180,24 @@ class ThreadGroup::Impl { // our implementation will use kBig mode by default and will let main thread // run on intended cores. std::vector ids; - if (mode == kSpecify) { - for (size_t i = 0; i < sorted_order_.size(); ++i) { - ids.push_back(sorted_order_[i]); - } - } else if (mode == kLittle) { - for (int i = 0; i < little_count_; ++i) { - ids.push_back(sorted_order_[sorted_order_.size() - i - 1]); - } - } else { - int num_cpu_workers = std::min(MaxConcurrency(), big_count_); - for (int i = 0; i < num_cpu_workers; ++i) { - ids.push_back(sorted_order_[i]); - } + switch(mode) { + case kSpecifyPerCorePerThread: + case kSepcifyAllThreadAllCore: + for (size_t i = 0; i < sorted_order_.size(); ++i) { + ids.push_back(sorted_order_[i]); + } + break; + case kLittle: + for (int i = 0; i < little_count_; ++i) { + ids.push_back(sorted_order_[sorted_order_.size() - i - 1]); + } + break; + case kBig: + int num_cpu_workers = std::min(MaxConcurrency(), big_count_); + for (int i = 0; i < num_cpu_workers; ++i) { + ids.push_back(sorted_order_[i]); + } + break; } SetThreadAffinity(thread, ids); } @@ -251,6 +271,9 @@ int ThreadGroup::Configure(AffinityMode mode, int nthreads, bool exclude_worker0 } void Yield() { std::this_thread::yield(); } +/*! + * \bief Set the maximum number of available cores. + */ void SetMaxConcurrency(int value) { if (value > 0) { max_concurrency = value; From 770fc92838e43a85809a75bcf247afc405f0a2a4 Mon Sep 17 00:00:00 2001 From: huajsj Date: Thu, 24 Feb 2022 23:05:15 -0800 Subject: [PATCH 10/11] address review comments. --- include/tvm/runtime/threading_backend.h | 16 ++++----- src/runtime/thread_pool.cc | 11 +++--- src/runtime/threading_backend.cc | 45 ++++++++++++++----------- tests/cpp/threading_backend_test.cc | 25 +++++--------- 4 files changed, 46 insertions(+), 51 deletions(-) diff --git a/include/tvm/runtime/threading_backend.h b/include/tvm/runtime/threading_backend.h index 4c8815e25d59..a6ca30bf2919 100644 --- a/include/tvm/runtime/threading_backend.h +++ b/include/tvm/runtime/threading_backend.h @@ -85,15 +85,15 @@ class ThreadGroup { enum AffinityMode : int { kBig = 1, kLittle = -1, - /*Different thread will get different affinity.*/ - kSpecifyPerCorePerThread = -2, - /*All threads get same core group affinity.*/ - kSepcifyAllThreadAllCore = -3, + /*Different threads will get different affinities.*/ + kSpecifyOneCorePerThread = -2, + /*All threads will get the same core group affinity.*/ + kSpecifyThreadShareAllCore = -3, }; /*! * \brief configure the CPU id affinity * - * \param mode The preferred CPU type (1 = big, -1 = little, -2 = specify). + * \param mode The preferred CPU type (1 = big, -1 = little ...). * \param nthreads The number of threads to use (0 = use all). * \param exclude_worker0 Whether to use the main thread as a worker. * If `true`, worker0 will not be launched in a new thread and @@ -119,7 +119,7 @@ void Yield(); */ int MaxConcurrency(); /*! - * \bief Set the maximum number of available cores. + * \brief Setting the maximum number of available cores. */ void SetMaxConcurrency(int value); /*! @@ -131,10 +131,10 @@ void SetMaxConcurrency(int value); void ResetThreadPool(); /*! - * \brief configure the CPU id affinity + * \brief Configuring the CPU affinity mode for the working threads. * \param mode The preferred CPU type (1 = big, -1 = little, -2 = specify). * \param nthreads The number of threads to use (0 = use all). - * \param cpus A list of CPU used to set 'cpu affinity'. + * \param cpus A list of CPUs is used to set the 'cpu affinity' for the worker threads. */ void Configure(tvm::runtime::threading::ThreadGroup::AffinityMode mode, int nthreads, std::vector cpus); diff --git a/src/runtime/thread_pool.cc b/src/runtime/thread_pool.cc index 90c2cbe8d6d7..07d36801149b 100644 --- a/src/runtime/thread_pool.cc +++ b/src/runtime/thread_pool.cc @@ -374,14 +374,12 @@ class ThreadPool { /*! * \brief args[0] is the AffinityMode, args[1] is the number of threads. - * args[2](optional) is a list id of CPU which is used to set CPU affinity. + * args2 is a list of CPUs which is used to set the CPU affinity. */ TVM_REGISTER_GLOBAL("runtime.config_threadpool").set_body([](TVMArgs args, TVMRetValue* rv) { threading::ThreadGroup::AffinityMode mode = static_cast(static_cast(args[0])); int nthreads = args[1]; - // The maximum number of available cores. - int max_concurrency = 0; std::vector cpus; if (args.num_args >= 3) { Array cpu_array = args[2]; @@ -390,9 +388,8 @@ TVM_REGISTER_GLOBAL("runtime.config_threadpool").set_body([](TVMArgs args, TVMRe cpus.push_back(std::stoi(cpu)); std::cout << "cpu is " << cpu << std::endl; } - max_concurrency = args[3]; } - threading::Configure(mode, nthreads, cpus, max_concurrency); + threading::Configure(mode, nthreads, cpus); }); namespace threading { @@ -405,8 +402,8 @@ void ResetThreadPool() { tvm::runtime::ThreadPool::ThreadLocal()->Reset(); } * */ void Configure(tvm::runtime::threading::ThreadGroup::AffinityMode mode, int nthreads, - std::vector cpus, int max_concurrency) { - tvm::runtime::threading::SetMaxConcurrency(max_concurrency); + std::vector cpus) { + tvm::runtime::threading::SetMaxConcurrency(cpus.size()); tvm::runtime::ThreadPool::ThreadLocal()->UpdateWorkerConfiguration(mode, nthreads, cpus); } } // namespace threading diff --git a/src/runtime/threading_backend.cc b/src/runtime/threading_backend.cc index 20784aa5cbfa..951e5eecd216 100644 --- a/src/runtime/threading_backend.cc +++ b/src/runtime/threading_backend.cc @@ -63,16 +63,21 @@ class ThreadGroup::Impl { int Configure(AffinityMode mode, int nthreads, bool exclude_worker0, std::vector cpus) { int num_workers_used = 0; - if (mode == kLittle) { - num_workers_used = little_count_; - } else if (mode == kBig) { - num_workers_used = big_count_; - } else if (mode == kSpecify) { - num_workers_used = cpus.size(); - sorted_order_ = cpus; - } else { - // use default - num_workers_used = threading::MaxConcurrency(); + switch (mode) { + case kLittle: + num_workers_used = little_count_; + break; + case kBig: + num_workers_used = big_count_; + break; + case kSpecifyOneCorePerThread: + case kSpecifyThreadShareAllCore: + num_workers_used = cpus.size(); + sorted_order_ = cpus; + break; + default: + // use default + num_workers_used = threading::MaxConcurrency(); } // if a specific number was given, use that if (nthreads) { @@ -115,13 +120,13 @@ class ThreadGroup::Impl { if (val != nullptr && atoi(val) != 1) { return; } - // Do not set affinity if there are more workers than found cores and mode is not kSpecify. + // Do not set affinity if there are more workers than found cores and mode is not kSpecify*. if (sorted_order_.size() < static_cast(num_workers_)) { switch (mode) { - case kSpecifyPerCorePerThread: - case kSepcifyAllThreadAllCore: - // if give a list of cpus and set mode as kSpecify need to restrict the threads - // on the said cpu list + // When the mode is kSpecifyOneCorePerThread or kSpecifyThreadShareAllCore, we should + // let the threads share all the cpu cores. + case kSpecifyOneCorePerThread: + case kSpecifyThreadShareAllCore: for (unsigned i = 0; i < threads_.size(); ++i) { SetThreadFullCpuAffinity(threads_[i].native_handle(), mode); } @@ -138,14 +143,14 @@ class ThreadGroup::Impl { } else { ICHECK_GE(sorted_order_.size(), num_workers_); switch (mode) { - case kSepcifyAllThreadAllCore: + case kSpecifyThreadShareAllCore: for (unsigned i = 0; i < threads_.size(); ++i) { SetThreadFullCpuAffinity(threads_[i].native_handle(), mode); } break; case kLittle: case kBig: - case kSpecifyPerCorePerThread: + case kSpecifyOneCorePerThread: for (unsigned i = 0; i < threads_.size(); ++i) { bool reverse = mode == kLittle; unsigned core_id; @@ -180,9 +185,9 @@ class ThreadGroup::Impl { // our implementation will use kBig mode by default and will let main thread // run on intended cores. std::vector ids; - switch(mode) { - case kSpecifyPerCorePerThread: - case kSepcifyAllThreadAllCore: + switch (mode) { + case kSpecifyOneCorePerThread: + case kSpecifyThreadShareAllCore: for (size_t i = 0; i < sorted_order_.size(); ++i) { ids.push_back(sorted_order_[i]); } diff --git a/tests/cpp/threading_backend_test.cc b/tests/cpp/threading_backend_test.cc index ec2decae66e8..1596bd038eb8 100644 --- a/tests/cpp/threading_backend_test.cc +++ b/tests/cpp/threading_backend_test.cc @@ -155,35 +155,28 @@ TEST(ThreadingBackend, TVMBackendAffinityConfigure) { } // Creating two threads to test the 'CPU list affinity' feature. const int threads_num = 2; - // Getting the maximum number of CPU which is available to each thread. + // Getting the maximum number of CPUs which are available to each thread. const int cpus_num_per_thread = max_concurrency / threads_num; - // Testing two scenario of concurrency. The '0' means there is no concurrency limitaion, - // The '3' means the value of maximium concurrency is '3'. - std::vector concurrency = {0, 3}; - for (auto concurrency_value : concurrency) { + // Testing two mode of affinity., + std::vector modes = { + tvm::runtime::threading::ThreadGroup::kSpecifyOneCorePerThread, + tvm::runtime::threading::ThreadGroup::kSpecifyThreadShareAllCore}; + for (auto mode : modes) { for (int thread_pool_idx = 0; thread_pool_idx < threads_num; thread_pool_idx++) { ts.emplace_back(new std::thread( - [&](int thread_pool_index, int sys_max_concurrency, int concurrency_config) { + [&](int thread_pool_index, int sys_max_concurrency) { std::atomic acc(0); AffinityCheck ac(thread_pool_index, sys_max_concurrency, &acc); std::vector cpus; for (int k = 0; k < cpus_num_per_thread; k++) { cpus.push_back(thread_pool_index * cpus_num_per_thread + k); } - if (concurrency_config > 0) { - // Testing the 'Configure' functin with the 'max_concurrency' parameter. - tvm::runtime::threading ::Configure(tvm::runtime::threading::ThreadGroup::kSpecify, 0, - cpus, concurrency_config); - } else { - // Testing the 'Configure' function without the 'max_concurrency' parameter. - tvm::runtime::threading ::Configure(tvm::runtime::threading::ThreadGroup::kSpecify, 0, - cpus); - } + tvm::runtime::threading ::Configure(mode, 0, cpus); TVMBackendParallelLaunch(affinity_check_task_id, &ac, 0); EXPECT_EQ(ac.GetComputeResult(), N * (N - 1) / 2); EXPECT_EQ(ac.VerifyAffinity(cpus), true); }, - thread_pool_idx, max_concurrency, concurrency_value)); + thread_pool_idx, max_concurrency)); } } for (auto& t : ts) { From 7ea0c2eef8f0750924c962ed07402e08e1d27ef0 Mon Sep 17 00:00:00 2001 From: huajsj Date: Fri, 25 Feb 2022 00:08:19 -0800 Subject: [PATCH 11/11] address review comments. --- include/tvm/runtime/threading_backend.h | 3 ++- src/runtime/thread_pool.cc | 5 +++-- tests/cpp/threading_backend_test.cc | 8 +++++--- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/include/tvm/runtime/threading_backend.h b/include/tvm/runtime/threading_backend.h index a6ca30bf2919..d44b273153d0 100644 --- a/include/tvm/runtime/threading_backend.h +++ b/include/tvm/runtime/threading_backend.h @@ -132,7 +132,8 @@ void ResetThreadPool(); /*! * \brief Configuring the CPU affinity mode for the working threads. - * \param mode The preferred CPU type (1 = big, -1 = little, -2 = specify). + * \param mode The preferred CPU type (1 = big, -1 = little, -2 = kSpecifyOneCorePerThread, + * -3 = kSpecifyThreadShareAllCore). * \param nthreads The number of threads to use (0 = use all). * \param cpus A list of CPUs is used to set the 'cpu affinity' for the worker threads. */ diff --git a/src/runtime/thread_pool.cc b/src/runtime/thread_pool.cc index 07d36801149b..50b231a1db90 100644 --- a/src/runtime/thread_pool.cc +++ b/src/runtime/thread_pool.cc @@ -396,9 +396,10 @@ namespace threading { void ResetThreadPool() { tvm::runtime::ThreadPool::ThreadLocal()->Reset(); } /*! * \brief configure the CPU id affinity - * \param mode The preferred CPU type (1 = big, -1 = little, -2 = specify). + * \param mode The preferred CPU type (1 = big, -1 = little, -2 = specify , + * -3 = kSpecifyOneCorePerThread, -3 = kSpecifyThreadShareAllCore). * \param nthreads The number of threads to use (0 = use all). - * \param cpus A list of CPU used to set 'cpu affinity'. + * \param cpus cpus A list of CPUs is used to set the 'cpu affinity' for the worker threads. * */ void Configure(tvm::runtime::threading::ThreadGroup::AffinityMode mode, int nthreads, diff --git a/tests/cpp/threading_backend_test.cc b/tests/cpp/threading_backend_test.cc index 1596bd038eb8..db32623531b8 100644 --- a/tests/cpp/threading_backend_test.cc +++ b/tests/cpp/threading_backend_test.cc @@ -164,19 +164,21 @@ TEST(ThreadingBackend, TVMBackendAffinityConfigure) { for (auto mode : modes) { for (int thread_pool_idx = 0; thread_pool_idx < threads_num; thread_pool_idx++) { ts.emplace_back(new std::thread( - [&](int thread_pool_index, int sys_max_concurrency) { + [&](int thread_pool_index, int sys_max_concurrency, + tvm::runtime::threading::ThreadGroup::AffinityMode affinity_mode) { std::atomic acc(0); AffinityCheck ac(thread_pool_index, sys_max_concurrency, &acc); std::vector cpus; + std::cout << affinity_mode << std::endl; for (int k = 0; k < cpus_num_per_thread; k++) { cpus.push_back(thread_pool_index * cpus_num_per_thread + k); } - tvm::runtime::threading ::Configure(mode, 0, cpus); + tvm::runtime::threading ::Configure(affinity_mode, 0, cpus); TVMBackendParallelLaunch(affinity_check_task_id, &ac, 0); EXPECT_EQ(ac.GetComputeResult(), N * (N - 1) / 2); EXPECT_EQ(ac.VerifyAffinity(cpus), true); }, - thread_pool_idx, max_concurrency)); + thread_pool_idx, max_concurrency, mode)); } } for (auto& t : ts) {