Skip to content

Commit

Permalink
[Runtime][ThreadPool] Refactor affinity function and support CPU affi…
Browse files Browse the repository at this point in the history
…nity list setting.

Issue:
1. There are multiple affinity function using "LINUX" and "ANDROID" macro
check and the multiple check make the logic maintain and change become
complex.

2. Current logic of tvm [Runtime][ThreadPool] assume all of the cpu resources are available for
a single backend runtime to do the data flow computation. But such assumption may not
true when user running multiple task on the system and not want tvm task
exhaust all of the cpu resource, or when user going to run multiple backend
runtime of tvm on the system, each backend runtime of tvm should use different cpu
affinity settings to achieve best performance.

Solution:
1.Refactor the affinity functions to move the "LINUX" and "ANDROID" check
into one function.

2.In this solution, we introduce a new "CPU AffinityMode type" named "kSpecify", by using
"kSpecify" and the function named "tvm::runtime::threading ::Configure" user can specify
the cpu list for the cpu affinity of a backend runtime.

This solution reused the existing per thread thread pool logic of [Runtime][Threadpool] that
created a worker thread pool for current thread which can running a particular runtime. for a multiple
runtime use case, user can first launch multiple threads, then call "tvm::runtime::threading ::Configure"
with cpu list to create tvm data flow worker thread pool, after doing this the execution of the multiple
runtime on the multiple threads will use different cpu resource list.
  • Loading branch information
huajsj authored and hua jiang committed Dec 28, 2021
1 parent e1255c9 commit 52ba194
Show file tree
Hide file tree
Showing 4 changed files with 290 additions and 95 deletions.
31 changes: 30 additions & 1 deletion include/tvm/runtime/threading_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,27 @@
#include <memory>
#include <vector>

#if defined(__linux__) || defined(__ANDROID__)
#if defined(__ANDROID__)
#ifndef CPU_SET
#define CPU_SETSIZE 1024
#define __NCPUBITS (8 * sizeof(uint64_t))
typedef struct {
uint64_t __bits[CPU_SETSIZE / __NCPUBITS];
} cpu_set_t;

#define CPU_SET(cpu, cpusetp) \
((cpusetp)->__bits[(cpu) / __NCPUBITS] |= (1UL << ((cpu) % __NCPUBITS)))
#define CPU_ZERO(cpusetp) memset((cpusetp), 0, sizeof(cpu_set_t))
#define CPU_ISSET(cpu, cpusetp) \
(1UL << ((cpu) % __NCPUBITS)) == \
((cpusetp)->__bits[(cpu) / __NCPUBITS] & (1UL << ((cpu) % __NCPUBITS)))
#define CPU_EQUAL(left, right) (memcmp(&left, &right, sizeof(cpu_set_t)) == 0)

#endif
#endif
#endif

namespace tvm {
namespace runtime {
namespace threading {
Expand Down Expand Up @@ -64,21 +85,24 @@ class ThreadGroup {
enum AffinityMode : int {
kBig = 1,
kLittle = -1,
kSpecify = -2,
};

/*!
* \brief configure the CPU id affinity
*
* \param mode The preferred CPU type (1 = big, -1 = little).
* \param nthreads The number of threads to use (0 = use all).
* \param cpus A list of cpu to use for affinity setting.
* \param exclude_worker0 Whether to use the main thread as a worker.
* If `true`, worker0 will not be launched in a new thread and
* `worker_callback` will only be called for values >= 1. This
* allows use of the main thread as a worker.
*
* \return The number of workers to use.
*/
int Configure(AffinityMode mode, int nthreads, bool exclude_worker0);
int Configure(AffinityMode mode, int nthreads, std::vector<unsigned int> cpus,
bool exclude_worker0);

private:
Impl* impl_;
Expand All @@ -94,6 +118,8 @@ void Yield();
*/
int MaxConcurrency();

void SetMaxConcurrency(int value);

/*!
* \brief Reset the threads in the pool. All current threads are destroyed and
* new ones are created.
Expand All @@ -102,6 +128,9 @@ int MaxConcurrency();
*/
void ResetThreadPool();

void Configure(tvm::runtime::threading::ThreadGroup::AffinityMode mode, int nthreads,
std::vector<unsigned int> cpus, int max_concurrency = 0);

} // namespace threading
} // namespace runtime
} // namespace tvm
Expand Down
27 changes: 22 additions & 5 deletions src/runtime/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <tvm/runtime/packed_func.h>
#include <tvm/runtime/registry.h>
#include <tvm/runtime/threading_backend.h>
#include <tvm/tir/expr.h>
#if TVM_THREADPOOL_USE_OPENMP
#include <omp.h>
#endif
Expand Down Expand Up @@ -317,10 +318,11 @@ class ThreadPool {

static ThreadPool* ThreadLocal() { return dmlc::ThreadLocalStore<ThreadPool>::Get(); }

void UpdateWorkerConfiguration(threading::ThreadGroup::AffinityMode mode, int nthreads) {
void UpdateWorkerConfiguration(threading::ThreadGroup::AffinityMode mode, int nthreads,
const std::vector<unsigned int>& cpus) {
// this will also reset the affinity of the ThreadGroup
// may use less than the MaxConcurrency number of workers
num_workers_used_ = threads_->Configure(mode, nthreads, exclude_worker0_);
num_workers_used_ = threads_->Configure(mode, nthreads, cpus, exclude_worker0_);
// if MaxConcurrency restricted the number of workers (e.g., due to
// hyperthreading), respect the restriction
num_workers_used_ = std::min(num_workers_, num_workers_used_);
Expand All @@ -337,7 +339,7 @@ class ThreadPool {
new tvm::runtime::threading::ThreadGroup(
num_workers_, [this](int worker_id) { this->RunWorker(worker_id); },
exclude_worker0_ /* include_main_thread */));
num_workers_used_ = threads_->Configure(threading::ThreadGroup::kBig, 0, exclude_worker0_);
num_workers_used_ = threads_->Configure(threading::ThreadGroup::kBig, 0, {}, exclude_worker0_);
}

// Internal worker function.
Expand Down Expand Up @@ -373,13 +375,28 @@ TVM_REGISTER_GLOBAL("runtime.config_threadpool").set_body([](TVMArgs args, TVMRe
threading::ThreadGroup::AffinityMode mode =
static_cast<threading::ThreadGroup::AffinityMode>(static_cast<int>(args[0]));
int nthreads = args[1];
ThreadPool::ThreadLocal()->UpdateWorkerConfiguration(mode, nthreads);
std::vector<unsigned int> cpus;
int max_concurrency = 0;
if (args.num_args == 3) {
Array<Integer> cpu_array = args[2];
for (auto cpu : cpu_array) {
cpus.push_back(cpu);
}
}
if (args.num_args == 4) {
max_concurrency = args[3];
}
threading::Configure(mode, nthreads, cpus, max_concurrency);
});

namespace threading {
void ResetThreadPool() { tvm::runtime::ThreadPool::ThreadLocal()->Reset(); }
void Configure(tvm::runtime::threading::ThreadGroup::AffinityMode mode, int nthreads,
std::vector<unsigned int> cpus, int max_concurrency) {
tvm::runtime::threading::SetMaxConcurrency(max_concurrency);
tvm::runtime::ThreadPool::ThreadLocal()->UpdateWorkerConfiguration(mode, nthreads, cpus);
}
} // namespace threading

} // namespace runtime
} // namespace tvm

Expand Down
Loading

0 comments on commit 52ba194

Please sign in to comment.