Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set the scheduling policy individually per thread #135

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/crucible/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace crucible {
Task() = default;

/// Create Task object containing closure and description.
Task(string title, function<void()> exec_fn);
Task(string title, int policy, function<void()> exec_fn);

/// Schedule Task for at most one future execution.
/// May run Task in current thread or in other thread.
Expand Down
23 changes: 17 additions & 6 deletions lib/task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ namespace crucible {
};

class TaskState : public enable_shared_from_this<TaskState> {
const int m_sched_policy;
mutex m_mutex;
const function<void()> m_exec_fn;
const string m_title;
Expand Down Expand Up @@ -93,7 +94,7 @@ namespace crucible {

public:
~TaskState();
TaskState(string title, function<void()> exec_fn);
TaskState(string title, int policy, function<void()> exec_fn);

/// Run the task at most one more time. If task has
/// already started running, a new instance is scheduled.
Expand Down Expand Up @@ -206,7 +207,7 @@ namespace crucible {
} else {
// If there are multiple tasks, create a new task to wrap our post-exec queue,
// then push it to the front of the global queue using normal locking methods.
TaskStatePtr rescue_task(make_shared<TaskState>("rescue_task", [](){}));
TaskStatePtr rescue_task(make_shared<TaskState>("rescue_task", SCHED_IDLE, [](){})); //TODO(kakra) Check prio
swap(rescue_task->m_post_exec_queue, queue);
TaskQueue tq_one { rescue_task };
TaskMasterState::push_front(tq_one);
Expand All @@ -220,7 +221,8 @@ namespace crucible {
--s_instance_count;
}

TaskState::TaskState(string title, function<void()> exec_fn) :
TaskState::TaskState(string title, int policy, function<void()> exec_fn) :
m_sched_policy(policy),
m_exec_fn(exec_fn),
m_title(title),
m_id(++s_next_id)
Expand Down Expand Up @@ -302,10 +304,17 @@ namespace crucible {
DIE_IF_MINUS_ERRNO(pthread_getname_np(pthread_self(), buf, sizeof(buf)));
DIE_IF_MINUS_ERRNO(pthread_setname_np(pthread_self(), m_title.c_str()));

int policy = SCHED_OTHER;
sched_param param = { .sched_priority = 0 };
DIE_IF_MINUS_ERRNO(pthread_getschedparam(pthread_self(), &policy, &param));

catch_all([&]() {
sched_param param = { .sched_priority = 0 };
pthread_setschedparam(pthread_self(), m_sched_policy, &param);
m_exec_fn();
});

pthread_setschedparam(pthread_self(), policy, &param);
pthread_setname_np(pthread_self(), buf);

lock.lock();
Expand Down Expand Up @@ -585,6 +594,8 @@ namespace crucible {
void
TaskMasterState::loadavg_thread_fn()
{
sched_param param = { .sched_priority = 0 };
pthread_setschedparam(pthread_self(), SCHED_ISO, &param);
pthread_setname_np(pthread_self(), "load_tracker");
while (!m_cancelled) {
adjust_thread_count();
Expand Down Expand Up @@ -627,8 +638,8 @@ namespace crucible {
{
}

Task::Task(string title, function<void()> exec_fn) :
m_task_state(make_shared<TaskState>(title, exec_fn))
Task::Task(string title, int policy, function<void()> exec_fn) :
m_task_state(make_shared<TaskState>(title, policy, exec_fn))
{
}

Expand Down Expand Up @@ -867,7 +878,7 @@ namespace crucible {
}

ExclusionState::ExclusionState(const string &title) :
m_task(title, [](){})
m_task(title, SCHED_IDLE, [](){}) //TODO(kakra) Check prio
{
}

Expand Down
1 change: 0 additions & 1 deletion scripts/[email protected]
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ After=sysinit.target
Type=simple
ExecStart=@PREFIX@/sbin/beesd --no-timestamps %i
CPUAccounting=true
CPUSchedulingPolicy=batch
CPUWeight=12
IOSchedulingClass=idle
IOSchedulingPriority=7
Expand Down
6 changes: 3 additions & 3 deletions src/bees-context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ BeesContext::scan_one_extent(const BeesFileRange &bfr, const Extent &e)
// Prealloc is all zero and we replace it with a hole.
// No special handling is required here. Nuke it and move on.
Task(
"dedup_prealloc",
"dedup_prealloc", SCHED_IDLE,
[m_ctx, bfr, e]() {
BEESLOGINFO("prealloc extent " << e);
// Must not extend past EOF
Expand Down Expand Up @@ -980,10 +980,10 @@ BeesContext::start()

m_progress_thread = make_shared<BeesThread>("progress_report");
m_status_thread = make_shared<BeesThread>("status_report");
m_progress_thread->exec([=]() {
m_progress_thread->exec(SCHED_OTHER, [=]() {
show_progress();
});
m_status_thread->exec([=]() {
m_status_thread->exec(SCHED_OTHER, [=]() {
dump_status();
});

Expand Down
8 changes: 4 additions & 4 deletions src/bees-hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -760,13 +760,13 @@ BeesHashTable::BeesHashTable(shared_ptr<BeesContext> ctx, string filename, off_t

m_extent_metadata.resize(m_extents);

m_writeback_thread.exec([&]() {
m_writeback_thread.exec(SCHED_IDLE, [&]() {
writeback_loop();
});
});

m_prefetch_thread.exec([&]() {
m_prefetch_thread.exec(SCHED_IDLE, [&]() {
prefetch_loop();
});
});

// Blacklist might fail if the hash table is not stored on a btrfs
catch_all([&]() {
Expand Down
8 changes: 4 additions & 4 deletions src/bees-roots.cc
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ BeesRoots::crawl_batch(shared_ptr<BeesCrawl> this_crawl)
auto this_hold = this_crawl->hold_state(this_range);
auto shared_this_copy = shared_from_this();
BEESNOTE("Starting task " << this_range);
Task(task_title, [ctx_copy, this_hold, this_range, shared_this_copy]() {
Task(task_title, SCHED_BATCH, [ctx_copy, this_hold, this_range, shared_this_copy]() {
BEESNOTE("scan_forward " << this_range);
ctx_copy->scan_forward(this_range);
shared_this_copy->crawl_state_set_dirty();
Expand Down Expand Up @@ -382,7 +382,7 @@ BeesRoots::crawl_thread()

// Create the Task that does the crawling
auto shared_this = shared_from_this();
m_crawl_task = Task("crawl_master", [shared_this]() {
m_crawl_task = Task("crawl_master", SCHED_IDLE, [shared_this]() {
auto tqs = TaskMaster::get_queue_count();
BEESNOTE("queueing extents to scan, " << tqs << " of " << BEES_MAX_QUEUE_SIZE);
bool run_again = true;
Expand Down Expand Up @@ -572,7 +572,7 @@ BeesRoots::BeesRoots(shared_ptr<BeesContext> ctx) :
void
BeesRoots::start()
{
m_crawl_thread.exec([&]() {
m_crawl_thread.exec(SCHED_IDLE, [&]() {
// Measure current transid before creating any crawlers
catch_all([&]() {
m_transid_re.update(transid_max_nocache());
Expand All @@ -583,7 +583,7 @@ BeesRoots::start()
state_load();
});

m_writeback_thread.exec([&]() {
m_writeback_thread.exec(SCHED_IDLE, [&]() {
writeback_thread();
});
crawl_thread();
Expand Down
10 changes: 6 additions & 4 deletions src/bees-thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,30 @@ BeesThread::BeesThread(string name) :
}

void
BeesThread::exec(function<void()> func)
BeesThread::exec(int policy, function<void()> func)
{
m_timer.reset();
BEESLOGDEBUG("BeesThread exec " << m_name);
m_thread_ptr = make_shared<thread>([=]() {
BeesNote::set_name(m_name);
BEESLOGDEBUG("Starting thread " << m_name);
BEESLOGDEBUG("Starting thread " << m_name << " policy " << policy);
BEESNOTE("thread function");
Timer thread_time;
catch_all([&]() {
sched_param param = { .sched_priority = 0 };
pthread_setschedparam(pthread_self(), policy, &param);
func();
});
BEESLOGDEBUG("Exiting thread " << m_name << ", " << thread_time << " sec");
});
}

BeesThread::BeesThread(string name, function<void()> func) :
BeesThread::BeesThread(string name, int policy, function<void()> func) :
m_name(name)
{
THROW_CHECK1(invalid_argument, name, !name.empty());
BEESLOGDEBUG("BeesThread construct " << m_name);
exec(func);
exec(policy, func);
}

void
Expand Down
19 changes: 19 additions & 0 deletions src/bees-trace.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ BeesNote::BeesNote(function<void(ostream &os)> f) :
m_func(f)
{
m_name = get_name();
m_policy = get_policy();
m_prev = tl_next;
tl_next = this;
unique_lock<mutex> lock(s_mutex);
Expand Down Expand Up @@ -149,6 +150,16 @@ BeesNote::get_name()
return buf;
}

int
BeesNote::get_policy()
{
int policy = SCHED_OTHER;
sched_param param = { .sched_priority = 0 };
pthread_getschedparam(pthread_self(), &policy, &param);

return policy;
}

BeesNote::ThreadStatusMap
BeesNote::get_status()
{
Expand All @@ -162,6 +173,14 @@ BeesNote::get_status()
if (t.second->m_timer.age() > BEES_TOO_LONG) {
oss << "[" << t.second->m_timer << "s] ";
}
switch (t.second->m_policy) {
case SCHED_BATCH:
oss << "[BT] ";
break;
case SCHED_IDLE:
oss << "[ID] ";
break;
}
t.second->m_func(oss);
rv[t.first] = oss.str();
}
Expand Down
6 changes: 4 additions & 2 deletions src/bees.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ class BeesTracer {
class BeesNote {
function<void(ostream &)> m_func;
BeesNote *m_prev;
int m_policy;
Timer m_timer;
string m_name;

Expand All @@ -225,6 +226,7 @@ class BeesNote {

static void set_name(const string &name);
static string get_name();
static int get_policy();
};

// C++ threads dumbed down even further
Expand All @@ -236,8 +238,8 @@ class BeesThread {
public:
~BeesThread();
BeesThread(string name);
BeesThread(string name, function<void()> args);
void exec(function<void()> args);
BeesThread(string name, int policy, function<void()> args);
void exec(int policy, function<void()> args);
void join();
void set_name(const string &name);
};
Expand Down
8 changes: 4 additions & 4 deletions test/task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ test_tasks(size_t count)
ostringstream oss;
oss << "task #" << c;
Task t(
oss.str(),
oss.str(), SCHED_OTHER,
[c, &task_done, &mtx, &cv]() {
unique_lock<mutex> lock(mtx);
// cerr << "Task #" << c << endl;
Expand Down Expand Up @@ -101,7 +101,7 @@ test_barrier(size_t count)
ostringstream oss;
oss << "task #" << c;
Task t(
oss.str(),
oss.str(), SCHED_OTHER,
[c, &task_done, &mtx, bl]() mutable {
// cerr << "Task #" << c << endl;
unique_lock<mutex> lock(mtx);
Expand All @@ -120,7 +120,7 @@ test_barrier(size_t count)
bool done_flag = false;

Task completed(
"Waiting for Barrier",
"Waiting for Barrier", SCHED_OTHER,
[&mtx, &cv, &done_flag]() {
unique_lock<mutex> lock(mtx);
// cerr << "Running cv notify" << endl;
Expand Down Expand Up @@ -170,7 +170,7 @@ test_exclusion(size_t count)
ostringstream oss;
oss << "task #" << c;
Task t(
oss.str(),
oss.str(), SCHED_OTHER,
[c, &only_one, excl, &lock_success_count, &lock_failure_count, &pings, &tasks_running, &cv, &mtx]() mutable {
// cerr << "Task #" << c << endl;
(void)c;
Expand Down