Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Runtime][ThreadPool]Refactor affinity function and support CPU affinity list setting. #9802

Merged
merged 11 commits into from
Mar 1, 2022
31 changes: 30 additions & 1 deletion include/tvm/runtime/threading_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,27 @@
#include <memory>
#include <vector>

#if defined(__linux__) || defined(__ANDROID__)
#if defined(__ANDROID__)
#ifndef CPU_SET
#define CPU_SETSIZE 1024
#define __NCPUBITS (8 * sizeof(uint64_t))
typedef struct {
uint64_t __bits[CPU_SETSIZE / __NCPUBITS];
} cpu_set_t;

#define CPU_SET(cpu, cpusetp) \
((cpusetp)->__bits[(cpu) / __NCPUBITS] |= (1UL << ((cpu) % __NCPUBITS)))
#define CPU_ZERO(cpusetp) memset((cpusetp), 0, sizeof(cpu_set_t))
#define CPU_ISSET(cpu, cpusetp) \
(1UL << ((cpu) % __NCPUBITS)) == \
((cpusetp)->__bits[(cpu) / __NCPUBITS] & (1UL << ((cpu) % __NCPUBITS)))
#define CPU_EQUAL(left, right) (memcmp(&left, &right, sizeof(cpu_set_t)) == 0)

#endif
#endif
#endif

namespace tvm {
namespace runtime {
namespace threading {
Expand Down Expand Up @@ -64,21 +85,24 @@ class ThreadGroup {
enum AffinityMode : int {
kBig = 1,
kLittle = -1,
kSpecify = -2,
};

/*!
* \brief configure the CPU id affinity
*
* \param mode The preferred CPU type (1 = big, -1 = little).
* \param nthreads The number of threads to use (0 = use all).
* \param cpus A list of CPU used to set 'cpu affinity'.
* \param exclude_worker0 Whether to use the main thread as a worker.
* If `true`, worker0 will not be launched in a new thread and
* `worker_callback` will only be called for values >= 1. This
* allows use of the main thread as a worker.
*
* \return The number of workers to use.
*/
int Configure(AffinityMode mode, int nthreads, bool exclude_worker0);
int Configure(AffinityMode mode, int nthreads, std::vector<unsigned int> cpus,
bool exclude_worker0);

private:
Impl* impl_;
Expand All @@ -94,6 +118,8 @@ void Yield();
*/
int MaxConcurrency();

void SetMaxConcurrency(int value);
huajsj marked this conversation as resolved.
Show resolved Hide resolved

/*!
* \brief Reset the threads in the pool. All current threads are destroyed and
* new ones are created.
Expand All @@ -102,6 +128,9 @@ int MaxConcurrency();
*/
void ResetThreadPool();

void Configure(tvm::runtime::threading::ThreadGroup::AffinityMode mode, int nthreads,
huajsj marked this conversation as resolved.
Show resolved Hide resolved
std::vector<unsigned int> cpus, int max_concurrency = 0);

} // namespace threading
} // namespace runtime
} // namespace tvm
Expand Down
27 changes: 22 additions & 5 deletions src/runtime/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <dmlc/thread_local.h>
#include <tvm/runtime/c_backend_api.h>
#include <tvm/runtime/c_runtime_api.h>
#include <tvm/runtime/container/array.h>
#include <tvm/runtime/logging.h>
#include <tvm/runtime/packed_func.h>
#include <tvm/runtime/registry.h>
Expand Down Expand Up @@ -317,10 +318,11 @@ class ThreadPool {

static ThreadPool* ThreadLocal() { return dmlc::ThreadLocalStore<ThreadPool>::Get(); }

void UpdateWorkerConfiguration(threading::ThreadGroup::AffinityMode mode, int nthreads) {
void UpdateWorkerConfiguration(threading::ThreadGroup::AffinityMode mode, int nthreads,
const std::vector<unsigned int>& cpus) {
// this will also reset the affinity of the ThreadGroup
// may use less than the MaxConcurrency number of workers
num_workers_used_ = threads_->Configure(mode, nthreads, exclude_worker0_);
num_workers_used_ = threads_->Configure(mode, nthreads, cpus, exclude_worker0_);
// if MaxConcurrency restricted the number of workers (e.g., due to
// hyperthreading), respect the restriction
num_workers_used_ = std::min(num_workers_, num_workers_used_);
Expand All @@ -337,7 +339,7 @@ class ThreadPool {
new tvm::runtime::threading::ThreadGroup(
num_workers_, [this](int worker_id) { this->RunWorker(worker_id); },
exclude_worker0_ /* include_main_thread */));
num_workers_used_ = threads_->Configure(threading::ThreadGroup::kBig, 0, exclude_worker0_);
num_workers_used_ = threads_->Configure(threading::ThreadGroup::kBig, 0, {}, exclude_worker0_);
huajsj marked this conversation as resolved.
Show resolved Hide resolved
}

// Internal worker function.
Expand Down Expand Up @@ -373,13 +375,28 @@ TVM_REGISTER_GLOBAL("runtime.config_threadpool").set_body([](TVMArgs args, TVMRe
threading::ThreadGroup::AffinityMode mode =
static_cast<threading::ThreadGroup::AffinityMode>(static_cast<int>(args[0]));
int nthreads = args[1];
ThreadPool::ThreadLocal()->UpdateWorkerConfiguration(mode, nthreads);
std::vector<unsigned int> cpus;
int max_concurrency = 0;
if (args.num_args == 3) {
Array<String> cpu_array = args[2];
for (auto cpu : cpu_array) {
cpus.push_back(std::stoi(cpu));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verify that the string represents a valid integer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you want to do is to restrict tvm thread run on specific cpu core ids? If so, how to handle Python's API interface? For example:

from tvm._ffi import get_global_func
config_threadpool = get_global_func('runtime.config_threadpool')
core_ids = (0, 1, 2)
config_threadpool(0, 1, *core_ids)

?
And how to handle the specific core ids is greater than the 2nd argument(i.e. how many threads to lauch)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @FrozenGene for the follow up, if the core ids is greater than threads number, all the threads will be set the affinity with all of the cpu in cpu list, at the said case, thread 0 will affinity with cpu (0, 1, 2) , the related logic in threading_backend.cc::120 - 129 line.

Copy link
Member

@FrozenGene FrozenGene Feb 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I asked is how to handle Python's API pack syntax. If I write the code as previous, your current code can not handle, because the unpacked argument will not the type of list like C++. @huajsj

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the misunderstanding. following are the answer

What you want to do is to restrict tvm thread run on specific cpu core ids?

Yes, restrict the worker thread running on specific cpu core or cpu core groups

how to handle Python's API interface?

the supported use case like following
config_threadpool('-1', 2, ['1', '2', '3'])

And how to handle the specific core ids is greater than the 2nd argument(i.e. how many threads to lauch).

In existing logic, the second parameter is not used to determine how many worker threads to launch, it is used as
a default value about how many task in a parallel running should be used when task number not get set.
and the final value of task number is the minimize of 'max_cocurrency' and this value .
The thread launched number determine by 'max_concurrency', in our solution ,this value will be the cpu id list size.
Under this solution, for example when cpu list is ['2', '8', '9'], nthreads is 2, exclude_worker0 is true(default) following will happen.

  1. mode is 'kSpecifyOneCorePerThread',
    1.1. 3 worker thread get launched , cpu affinity like following
    T1 (2-9)
    T2 (8)
    T3 (9)
    1.2 when run the task
    task1 --> T1
    task2 --> T2
  2. mode is 'kSpecifyThreadShareAllCore',
    2.1 3 worker thread get launched , cpu affinity like following
    T1 (2-9)
    T2 (2-9)
    T3 (2-9)
    2.2 when run the task
    task1 --> T1
    task2 --> T2

}
}
if (args.num_args == 4) {
max_concurrency = args[3];
}
huajsj marked this conversation as resolved.
Show resolved Hide resolved
threading::Configure(mode, nthreads, cpus, max_concurrency);
});

namespace threading {
void ResetThreadPool() { tvm::runtime::ThreadPool::ThreadLocal()->Reset(); }
void Configure(tvm::runtime::threading::ThreadGroup::AffinityMode mode, int nthreads,
std::vector<unsigned int> cpus, int max_concurrency) {
tvm::runtime::threading::SetMaxConcurrency(max_concurrency);
tvm::runtime::ThreadPool::ThreadLocal()->UpdateWorkerConfiguration(mode, nthreads, cpus);
}
} // namespace threading

} // namespace runtime
} // namespace tvm

Expand Down
Loading