Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline: use notify instead of polling for UnorderedSourceOp #8872

Merged
merged 21 commits into from
Mar 29, 2024
Merged
5 changes: 4 additions & 1 deletion dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,7 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
"pipeline scheduler", \
Gauge, \
F(type_waiting_tasks_count, {"type", "waiting_tasks_count"}), \
F(type_wait_for_notify_tasks_count, {"type", "wait_for_notify_tasks_count"}), \
F(type_cpu_pending_tasks_count, {"type", "cpu_pending_tasks_count"}), \
F(type_cpu_executing_tasks_count, {"type", "cpu_executing_tasks_count"}), \
F(type_io_pending_tasks_count, {"type", "io_pending_tasks_count"}), \
Expand All @@ -748,7 +749,8 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
F(type_io_execute, {{"type", "io_execute"}}, ExpBuckets{0.005, 2, 20}), \
F(type_cpu_queue, {{"type", "cpu_queue"}}, ExpBuckets{0.005, 2, 20}), \
F(type_io_queue, {{"type", "io_queue"}}, ExpBuckets{0.005, 2, 20}), \
F(type_await, {{"type", "await"}}, ExpBuckets{0.005, 2, 20})) \
F(type_await, {{"type", "await"}}, ExpBuckets{0.005, 2, 20}), \
F(type_wait_for_notify, {{"type", "wait_for_notify"}}, ExpBuckets{0.005, 2, 20})) \
M(tiflash_pipeline_task_execute_max_time_seconds_per_round, \
"Bucketed histogram of pipeline task execute max time per round in seconds", \
Histogram, /* these command usually cost several hundred milliseconds to several seconds, increase the start bucket to 5ms */ \
Expand All @@ -758,6 +760,7 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
"pipeline task change to status", \
Counter, \
F(type_to_waiting, {"type", "to_waiting"}), \
F(type_to_wait_for_notify, {"type", "to_wait_for_notify"}), \
F(type_to_running, {"type", "to_running"}), \
F(type_to_io, {"type", "to_io"}), \
F(type_to_finished, {"type", "to_finished"}), \
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ extern const char random_pipeline_model_execute_suffix_failpoint[];
case OperatorStatus::WAITING: \
fillAwaitable((op).get()); \
return (op_status); \
/* For unexpected status, an immediate return is required. */ \
/* For other status, an immediate return is required. */ \
default: \
return (op_status); \
}
Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Flash/Pipeline/Schedule/Reactor/WaitReactor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Common/setThreadName.h>
#include <Flash/Pipeline/Schedule/Reactor/WaitReactor.h>
#include <Flash/Pipeline/Schedule/TaskScheduler.h>
#include <Flash/Pipeline/Schedule/Tasks/NotifyFuture.h>
#include <Flash/Pipeline/Schedule/Tasks/TaskHelper.h>
#include <common/logger_useful.h>
#include <errno.h>
Expand All @@ -29,6 +30,7 @@ WaitReactor::WaitReactor(TaskScheduler & scheduler_)
: scheduler(scheduler_)
{
GET_METRIC(tiflash_pipeline_scheduler, type_waiting_tasks_count).Set(0);
GET_METRIC(tiflash_pipeline_scheduler, type_wait_for_notify_tasks_count).Set(0);
thread = std::thread(&WaitReactor::loop, this);
}

Expand All @@ -50,6 +52,9 @@ bool WaitReactor::awaitAndCollectReadyTask(WaitingTask && task)
task_ptr->profile_info.elapsedAwaitTime();
io_tasks.push_back(std::move(task.first));
return true;
case ExecTaskStatus::WAIT_FOR_NOTIFY:
registerTaskToFuture(std::move(task.first));
return true;
case FINISH_STATUS:
task_ptr->profile_info.elapsedAwaitTime();
task_ptr->startTraceMemory();
Expand Down Expand Up @@ -161,8 +166,8 @@ void WaitReactor::react(WaitingTasks & local_waiting_tasks)
++task_it;
}

#ifdef __APPLE__
auto & metrics = GET_METRIC(tiflash_pipeline_scheduler, type_waiting_tasks_count);
#if __APPLE__ && __clang__
__thread auto & metrics = GET_METRIC(tiflash_pipeline_scheduler, type_waiting_tasks_count);
#else
thread_local auto & metrics = GET_METRIC(tiflash_pipeline_scheduler, type_waiting_tasks_count);
#endif
Expand Down
42 changes: 42 additions & 0 deletions dbms/src/Flash/Pipeline/Schedule/Tasks/NotifyFuture.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Flash/Pipeline/Schedule/Tasks/NotifyFuture.h>

namespace DB
{
#if __APPLE__ && __clang__
__thread NotifyFuturePtr current_notify_future = nullptr;
#else
thread_local NotifyFuturePtr current_notify_future = nullptr;
#endif

void setNotifyFuture(NotifyFuturePtr new_future)
{
assert(current_notify_future == nullptr);
current_notify_future = std::move(new_future);
}

void clearNotifyFuture()
{
current_notify_future.reset();
}

void registerTaskToFuture(TaskPtr && task)
{
assert(current_notify_future != nullptr);
current_notify_future->registerTask(std::move(task));
current_notify_future.reset();
}
} // namespace DB
39 changes: 39 additions & 0 deletions dbms/src/Flash/Pipeline/Schedule/Tasks/NotifyFuture.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Flash/Pipeline/Schedule/Tasks/Task.h>

namespace DB
{
struct NotifyFuture
{
NotifyFuture() = default;
virtual ~NotifyFuture() = default;
virtual void registerTask(TaskPtr && task) = 0;
};
using NotifyFuturePtr = std::shared_ptr<NotifyFuture>;

#if __APPLE__ && __clang__
extern __thread NotifyFuturePtr current_notify_future;
#else
extern thread_local NotifyFuturePtr current_notify_future;
#endif

void setNotifyFuture(NotifyFuturePtr new_future);
void clearNotifyFuture();
void registerTaskToFuture(TaskPtr && task);

} // namespace DB
100 changes: 100 additions & 0 deletions dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Flash/Pipeline/Schedule/TaskScheduler.h>
#include <Flash/Pipeline/Schedule/Tasks/Task.h>

#include <deque>

namespace DB
{
class PipeConditionVariable
{
public:
inline void registerTask(TaskPtr && task)
{
assert(task);
assert(task->getStatus() == ExecTaskStatus::WAIT_FOR_NOTIFY);
{
std::lock_guard lock(mu);
tasks.push_back(std::move(task));
}

#if __APPLE__ && __clang__
__thread auto & metrics = GET_METRIC(tiflash_pipeline_scheduler, type_wait_for_notify_tasks_count);
#else
thread_local auto & metrics = GET_METRIC(tiflash_pipeline_scheduler, type_wait_for_notify_tasks_count);
#endif
metrics.Increment();
}

inline void notifyOne()
{
TaskPtr task;
{
std::lock_guard lock(mu);
if (tasks.empty())
return;
task = std::move(tasks.front());
tasks.pop_front();
}
assert(task);
notifyTaskDirectly(std::move(task));

#if __APPLE__ && __clang__
__thread auto & metrics = GET_METRIC(tiflash_pipeline_scheduler, type_wait_for_notify_tasks_count);
#else
thread_local auto & metrics = GET_METRIC(tiflash_pipeline_scheduler, type_wait_for_notify_tasks_count);
#endif
metrics.Decrement();
}

inline void notifyAll()
{
std::deque<TaskPtr> cur_tasks;
{
std::lock_guard lock(mu);
std::swap(cur_tasks, tasks);
}
size_t tasks_cnt = cur_tasks.size();
while (!cur_tasks.empty())
{
notifyTaskDirectly(std::move(cur_tasks.front()));
cur_tasks.pop_front();
}

#if __APPLE__ && __clang__
__thread auto & metrics = GET_METRIC(tiflash_pipeline_scheduler, type_wait_for_notify_tasks_count);
#else
thread_local auto & metrics = GET_METRIC(tiflash_pipeline_scheduler, type_wait_for_notify_tasks_count);
#endif
metrics.Decrement(tasks_cnt);
}

static inline void notifyTaskDirectly(TaskPtr && task)
{
assert(task);
task->notify();
task->profile_info.elapsedWaitForNotifyTime();
assert(TaskScheduler::instance);
TaskScheduler::instance->submitToCPUTaskThreadPool(std::move(task));
}

private:
std::mutex mu;
std::deque<TaskPtr> tasks;
};
} // namespace DB
4 changes: 3 additions & 1 deletion dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ class PipelineTask

void doFinalizeImpl() override
{
runFinalize(profile_info.getCPUPendingTimeNs() + profile_info.getIOPendingTimeNs() + getScheduleDuration());
runFinalize(
profile_info.getCPUPendingTimeNs() + profile_info.getIOPendingTimeNs()
+ profile_info.getWaitForNotifyTimeNs() + getScheduleDuration());
}
};
} // namespace DB
44 changes: 24 additions & 20 deletions dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTaskBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,30 @@ namespace DB
/// - OperatorStatus::WAITING ==> ExecTaskStatus::WAITING
/// - OperatorStatus::NEED_INPUT/HAS_OUTPUT ==> ExecTaskStatus::RUNNING

#define MAP_NOT_RUNNING_TASK_STATUS \
case OperatorStatus::FINISHED: \
{ \
return ExecTaskStatus::FINISHED; \
} \
case OperatorStatus::CANCELLED: \
{ \
return ExecTaskStatus::CANCELLED; \
} \
case OperatorStatus::IO_IN: \
{ \
return ExecTaskStatus::IO_IN; \
} \
case OperatorStatus::IO_OUT: \
{ \
return ExecTaskStatus::IO_OUT; \
} \
case OperatorStatus::WAITING: \
{ \
return ExecTaskStatus::WAITING; \
#define MAP_NOT_RUNNING_TASK_STATUS \
case OperatorStatus::FINISHED: \
{ \
return ExecTaskStatus::FINISHED; \
} \
case OperatorStatus::CANCELLED: \
{ \
return ExecTaskStatus::CANCELLED; \
} \
case OperatorStatus::IO_IN: \
{ \
return ExecTaskStatus::IO_IN; \
} \
case OperatorStatus::IO_OUT: \
{ \
return ExecTaskStatus::IO_OUT; \
} \
case OperatorStatus::WAITING: \
{ \
return ExecTaskStatus::WAITING; \
} \
case OperatorStatus::WAIT_FOR_NOTIFY: \
{ \
return ExecTaskStatus::WAIT_FOR_NOTIFY; \
}

#define UNEXPECTED_OP_STATUS(op_status, function_name) \
Expand Down
Loading