Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Runtime][ThreadPool]Refactor affinity function and support CPU affinity list setting. #9802

Merged
merged 11 commits into from
Mar 1, 2022
48 changes: 43 additions & 5 deletions include/tvm/runtime/threading_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,27 @@
#include <memory>
#include <vector>

#if defined(__linux__) || defined(__ANDROID__)
#if defined(__ANDROID__)
#ifndef CPU_SET
#define CPU_SETSIZE 1024
#define __NCPUBITS (8 * sizeof(uint64_t))
typedef struct {
uint64_t __bits[CPU_SETSIZE / __NCPUBITS];
} cpu_set_t;

#define CPU_SET(cpu, cpusetp) \
((cpusetp)->__bits[(cpu) / __NCPUBITS] |= (1UL << ((cpu) % __NCPUBITS)))
#define CPU_ZERO(cpusetp) memset((cpusetp), 0, sizeof(cpu_set_t))
#define CPU_ISSET(cpu, cpusetp) \
(1UL << ((cpu) % __NCPUBITS)) == \
((cpusetp)->__bits[(cpu) / __NCPUBITS] & (1UL << ((cpu) % __NCPUBITS)))
#define CPU_EQUAL(left, right) (memcmp(&left, &right, sizeof(cpu_set_t)) == 0)

#endif
#endif
#endif

namespace tvm {
namespace runtime {
namespace threading {
Expand Down Expand Up @@ -64,21 +85,26 @@ class ThreadGroup {
enum AffinityMode : int {
kBig = 1,
kLittle = -1,
/*Different threads will get different affinities.*/
kSpecifyOneCorePerThread = -2,
/*All threads will get the same core group affinity.*/
kSpecifyThreadShareAllCore = -3,
};

/*!
* \brief configure the CPU id affinity
*
* \param mode The preferred CPU type (1 = big, -1 = little).
* \param mode The preferred CPU type (1 = big, -1 = little ...).
* \param nthreads The number of threads to use (0 = use all).
* \param exclude_worker0 Whether to use the main thread as a worker.
* If `true`, worker0 will not be launched in a new thread and
* `worker_callback` will only be called for values >= 1. This
* allows use of the main thread as a worker.
* \param cpus A list of CPU used to set 'cpu affinity'.
*
* \return The number of workers to use.
*/
int Configure(AffinityMode mode, int nthreads, bool exclude_worker0);
int Configure(AffinityMode mode, int nthreads, bool exclude_worker0,
std::vector<unsigned int> cpus = {});

private:
Impl* impl_;
Expand All @@ -88,12 +114,14 @@ class ThreadGroup {
* \brief Platform-agnostic no-op.
*/
void Yield();

/*!
* \return the maximum number of effective workers for this system.
*/
int MaxConcurrency();

/*!
* \brief Setting the maximum number of available cores.
*/
void SetMaxConcurrency(int value);
huajsj marked this conversation as resolved.
Show resolved Hide resolved
/*!
* \brief Reset the threads in the pool. All current threads are destroyed and
* new ones are created.
Expand All @@ -102,6 +130,16 @@ int MaxConcurrency();
*/
void ResetThreadPool();

/*!
* \brief Configuring the CPU affinity mode for the working threads.
* \param mode The preferred CPU type (1 = big, -1 = little, -2 = kSpecifyOneCorePerThread,
* -3 = kSpecifyThreadShareAllCore).
* \param nthreads The number of threads to use (0 = use all).
* \param cpus A list of CPUs is used to set the 'cpu affinity' for the worker threads.
*/
void Configure(tvm::runtime::threading::ThreadGroup::AffinityMode mode, int nthreads,
huajsj marked this conversation as resolved.
Show resolved Hide resolved
std::vector<unsigned int> cpus);

} // namespace threading
} // namespace runtime
} // namespace tvm
Expand Down
38 changes: 33 additions & 5 deletions src/runtime/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <dmlc/thread_local.h>
#include <tvm/runtime/c_backend_api.h>
#include <tvm/runtime/c_runtime_api.h>
#include <tvm/runtime/container/array.h>
#include <tvm/runtime/logging.h>
#include <tvm/runtime/packed_func.h>
#include <tvm/runtime/registry.h>
Expand All @@ -42,12 +43,13 @@
#include <thread>
#include <vector>

#include "../support/utils.h"
const constexpr int kL1CacheBytes = 64;

namespace tvm {
namespace runtime {
namespace {

using support::IsNumber;
constexpr uint32_t kDefaultSpinCount = 300000;

uint32_t GetSpinCount() {
Expand Down Expand Up @@ -317,10 +319,11 @@ class ThreadPool {

static ThreadPool* ThreadLocal() { return dmlc::ThreadLocalStore<ThreadPool>::Get(); }

void UpdateWorkerConfiguration(threading::ThreadGroup::AffinityMode mode, int nthreads) {
void UpdateWorkerConfiguration(threading::ThreadGroup::AffinityMode mode, int nthreads,
const std::vector<unsigned int>& cpus) {
// this will also reset the affinity of the ThreadGroup
// may use less than the MaxConcurrency number of workers
num_workers_used_ = threads_->Configure(mode, nthreads, exclude_worker0_);
num_workers_used_ = threads_->Configure(mode, nthreads, exclude_worker0_, cpus);
// if MaxConcurrency restricted the number of workers (e.g., due to
// hyperthreading), respect the restriction
num_workers_used_ = std::min(num_workers_, num_workers_used_);
Expand Down Expand Up @@ -369,17 +372,42 @@ class ThreadPool {
std::unique_ptr<tvm::runtime::threading::ThreadGroup> threads_;
};

/*!
* \brief args[0] is the AffinityMode, args[1] is the number of threads.
* args2 is a list of CPUs which is used to set the CPU affinity.
*/
TVM_REGISTER_GLOBAL("runtime.config_threadpool").set_body([](TVMArgs args, TVMRetValue* rv) {
threading::ThreadGroup::AffinityMode mode =
static_cast<threading::ThreadGroup::AffinityMode>(static_cast<int>(args[0]));
int nthreads = args[1];
ThreadPool::ThreadLocal()->UpdateWorkerConfiguration(mode, nthreads);
std::vector<unsigned int> cpus;
if (args.num_args >= 3) {
Array<String> cpu_array = args[2];
for (auto cpu : cpu_array) {
ICHECK(IsNumber(cpu)) << "The CPU core information '" << cpu << "' is not a number.";
cpus.push_back(std::stoi(cpu));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verify that the string represents a valid integer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you want to do is to restrict tvm thread run on specific cpu core ids? If so, how to handle Python's API interface? For example:

from tvm._ffi import get_global_func
config_threadpool = get_global_func('runtime.config_threadpool')
core_ids = (0, 1, 2)
config_threadpool(0, 1, *core_ids)

?
And how to handle the specific core ids is greater than the 2nd argument(i.e. how many threads to lauch)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @FrozenGene for the follow up, if the core ids is greater than threads number, all the threads will be set the affinity with all of the cpu in cpu list, at the said case, thread 0 will affinity with cpu (0, 1, 2) , the related logic in threading_backend.cc::120 - 129 line.

Copy link
Member

@FrozenGene FrozenGene Feb 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I asked is how to handle Python's API pack syntax. If I write the code as previous, your current code can not handle, because the unpacked argument will not the type of list like C++. @huajsj

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the misunderstanding. following are the answer

What you want to do is to restrict tvm thread run on specific cpu core ids?

Yes, restrict the worker thread running on specific cpu core or cpu core groups

how to handle Python's API interface?

the supported use case like following
config_threadpool('-1', 2, ['1', '2', '3'])

And how to handle the specific core ids is greater than the 2nd argument(i.e. how many threads to lauch).

In existing logic, the second parameter is not used to determine how many worker threads to launch, it is used as
a default value about how many task in a parallel running should be used when task number not get set.
and the final value of task number is the minimize of 'max_cocurrency' and this value .
The thread launched number determine by 'max_concurrency', in our solution ,this value will be the cpu id list size.
Under this solution, for example when cpu list is ['2', '8', '9'], nthreads is 2, exclude_worker0 is true(default) following will happen.

  1. mode is 'kSpecifyOneCorePerThread',
    1.1. 3 worker thread get launched , cpu affinity like following
    T1 (2-9)
    T2 (8)
    T3 (9)
    1.2 when run the task
    task1 --> T1
    task2 --> T2
  2. mode is 'kSpecifyThreadShareAllCore',
    2.1 3 worker thread get launched , cpu affinity like following
    T1 (2-9)
    T2 (2-9)
    T3 (2-9)
    2.2 when run the task
    task1 --> T1
    task2 --> T2

std::cout << "cpu is " << cpu << std::endl;
}
}
threading::Configure(mode, nthreads, cpus);
});

namespace threading {
void ResetThreadPool() { tvm::runtime::ThreadPool::ThreadLocal()->Reset(); }
/*!
* \brief configure the CPU id affinity
* \param mode The preferred CPU type (1 = big, -1 = little, -2 = specify ,
* -3 = kSpecifyOneCorePerThread, -3 = kSpecifyThreadShareAllCore).
* \param nthreads The number of threads to use (0 = use all).
* \param cpus cpus A list of CPUs is used to set the 'cpu affinity' for the worker threads.
*
*/
void Configure(tvm::runtime::threading::ThreadGroup::AffinityMode mode, int nthreads,
std::vector<unsigned int> cpus) {
tvm::runtime::threading::SetMaxConcurrency(cpus.size());
tvm::runtime::ThreadPool::ThreadLocal()->UpdateWorkerConfiguration(mode, nthreads, cpus);
}
} // namespace threading

} // namespace runtime
} // namespace tvm

Expand Down
Loading