Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline: use notify instead of polling for UnorderedSourceOp #8872

Merged
merged 21 commits into from
Mar 29, 2024
Merged
5 changes: 4 additions & 1 deletion dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,7 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
"pipeline scheduler", \
Gauge, \
F(type_waiting_tasks_count, {"type", "waiting_tasks_count"}), \
F(type_wait_for_notify_tasks_count, {"type", "wait_for_notify_tasks_count"}), \
F(type_cpu_pending_tasks_count, {"type", "cpu_pending_tasks_count"}), \
F(type_cpu_executing_tasks_count, {"type", "cpu_executing_tasks_count"}), \
F(type_io_pending_tasks_count, {"type", "io_pending_tasks_count"}), \
Expand All @@ -735,7 +736,8 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
F(type_io_execute, {{"type", "io_execute"}}, ExpBuckets{0.005, 2, 20}), \
F(type_cpu_queue, {{"type", "cpu_queue"}}, ExpBuckets{0.005, 2, 20}), \
F(type_io_queue, {{"type", "io_queue"}}, ExpBuckets{0.005, 2, 20}), \
F(type_await, {{"type", "await"}}, ExpBuckets{0.005, 2, 20})) \
F(type_await, {{"type", "await"}}, ExpBuckets{0.005, 2, 20}), \
F(type_wait_for_notify, {{"type", "wait_for_notify"}}, ExpBuckets{0.005, 2, 20})) \
M(tiflash_pipeline_task_execute_max_time_seconds_per_round, \
"Bucketed histogram of pipeline task execute max time per round in seconds", \
Histogram, /* these command usually cost several hundred milliseconds to several seconds, increase the start bucket to 5ms */ \
Expand All @@ -745,6 +747,7 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
"pipeline task change to status", \
Counter, \
F(type_to_waiting, {"type", "to_waiting"}), \
F(type_to_wait_for_notify, {"type", "to_wait_for_notify"}), \
F(type_to_running, {"type", "to_running"}), \
F(type_to_io, {"type", "to_io"}), \
F(type_to_finished, {"type", "to_finished"}), \
Expand Down
40 changes: 22 additions & 18 deletions dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ extern const char random_pipeline_model_execute_suffix_failpoint[];
} // namespace FailPoints

#define HANDLE_OP_STATUS(op, op_status, expect_status) \
switch (op_status) \
switch ((op_status).status) \
{ \
/* For the expected status, it will not return here, */ \
/* but instead return control to the macro caller, */ \
Expand All @@ -41,14 +41,14 @@ extern const char random_pipeline_model_execute_suffix_failpoint[];
case OperatorStatus::WAITING: \
fillAwaitable((op).get()); \
return (op_status); \
/* For unexpected status, an immediate return is required. */ \
/* For other status, an immediate return is required. */ \
default: \
return (op_status); \
}

#define HANDLE_LAST_OP_STATUS(op, op_status) \
assert(op); \
switch (op_status) \
switch ((op_status).status) \
{ \
/* For the io status, the operator needs to be filled in io_op for later use in executeIO. */ \
case OperatorStatus::IO_IN: \
Expand Down Expand Up @@ -89,12 +89,12 @@ void PipelineExec::executeSuffix()
source_op->operateSuffix();
}

OperatorStatus PipelineExec::execute()
ReturnOpStatus PipelineExec::execute()
{
auto op_status = executeImpl();
#ifndef NDEBUG
// `NEED_INPUT` means that pipeline_exec need data to do the calculations and expect the next call to `execute`.
assertOperatorStatus(op_status, {OperatorStatus::FINISHED, OperatorStatus::NEED_INPUT});
assertOperatorStatus(op_status.status, {OperatorStatus::FINISHED, OperatorStatus::NEED_INPUT});
#endif
return op_status;
}
Expand All @@ -105,13 +105,13 @@ OperatorStatus PipelineExec::execute()
* │ block
* write◄────transform◄─── ... ◄───transform◄────────────┘
*/
OperatorStatus PipelineExec::executeImpl()
ReturnOpStatus PipelineExec::executeImpl()
{
Block block;
size_t start_transform_op_index = 0;
auto op_status = fetchBlock(block, start_transform_op_index);
// If the status `fetchBlock` returns isn't `HAS_OUTPUT`, it means that `fetchBlock` did not return a block.
if (op_status != OperatorStatus::HAS_OUTPUT)
if (op_status.status != OperatorStatus::HAS_OUTPUT)
return op_status;

// start from the next transform op after fetched block transform op.
Expand All @@ -127,7 +127,7 @@ OperatorStatus PipelineExec::executeImpl()
}

// try fetch block from transform_ops and source_op.
OperatorStatus PipelineExec::fetchBlock(Block & block, size_t & start_transform_op_index)
ReturnOpStatus PipelineExec::fetchBlock(Block & block, size_t & start_transform_op_index)
{
auto op_status = sink_op->prepare();
HANDLE_OP_STATUS(sink_op, op_status, OperatorStatus::NEED_INPUT);
Expand All @@ -144,44 +144,48 @@ OperatorStatus PipelineExec::fetchBlock(Block & block, size_t & start_transform_
HANDLE_LAST_OP_STATUS(source_op, op_status);
}

OperatorStatus PipelineExec::executeIO()
ReturnOpStatus PipelineExec::executeIO()
{
auto op_status = executeIOImpl();
#ifndef NDEBUG
// `NEED_INPUT` means that pipeline_exec need data to do the calculations and expect the next call to `execute`.
// `HAS_OUTPUT` means that pipeline_exec has data to do the calculations and expect the next call to `execute`.
assertOperatorStatus(op_status, {OperatorStatus::FINISHED, OperatorStatus::HAS_OUTPUT, OperatorStatus::NEED_INPUT});
assertOperatorStatus(
op_status.status,
{OperatorStatus::FINISHED, OperatorStatus::HAS_OUTPUT, OperatorStatus::NEED_INPUT});
#endif
return op_status;
}
OperatorStatus PipelineExec::executeIOImpl()
ReturnOpStatus PipelineExec::executeIOImpl()
{
assert(io_op);
auto op_status = io_op->executeIO();
if (op_status == OperatorStatus::WAITING)
if (op_status.status == OperatorStatus::WAITING)
fillAwaitable(io_op);
if (op_status != OperatorStatus::IO_IN && op_status != OperatorStatus::IO_OUT)
if (op_status.status != OperatorStatus::IO_IN && op_status.status != OperatorStatus::IO_OUT)
io_op = nullptr;
return op_status;
}

OperatorStatus PipelineExec::await()
ReturnOpStatus PipelineExec::await()
{
auto op_status = awaitImpl();
#ifndef NDEBUG
// `HAS_OUTPUT` means that pipeline_exec has data to do the calculations and expect the next call to `execute`.
// `NEED_INPUT` means that pipeline_exec need data to do the calculations and expect the next call to `execute`.
assertOperatorStatus(op_status, {OperatorStatus::FINISHED, OperatorStatus::HAS_OUTPUT, OperatorStatus::NEED_INPUT});
assertOperatorStatus(
op_status.status,
{OperatorStatus::FINISHED, OperatorStatus::HAS_OUTPUT, OperatorStatus::NEED_INPUT});
#endif
return op_status;
}
OperatorStatus PipelineExec::awaitImpl()
ReturnOpStatus PipelineExec::awaitImpl()
{
assert(awaitable);
auto op_status = awaitable->await();
if (op_status == OperatorStatus::IO_IN || op_status == OperatorStatus::IO_OUT)
if (op_status.status == OperatorStatus::IO_IN || op_status.status == OperatorStatus::IO_OUT)
fillIOOp(awaitable);
if (op_status != OperatorStatus::WAITING)
if (op_status.status != OperatorStatus::WAITING)
awaitable = nullptr;
return op_status;
}
Expand Down
14 changes: 7 additions & 7 deletions dbms/src/Flash/Pipeline/Exec/PipelineExec.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,22 @@ class PipelineExec : private boost::noncopyable
void executePrefix();
void executeSuffix();

OperatorStatus execute();
ReturnOpStatus execute();

OperatorStatus executeIO();
ReturnOpStatus executeIO();

OperatorStatus await();
ReturnOpStatus await();

void finalizeProfileInfo(UInt64 extra_time);

private:
inline OperatorStatus executeImpl();
inline ReturnOpStatus executeImpl();

inline OperatorStatus executeIOImpl();
inline ReturnOpStatus executeIOImpl();

inline OperatorStatus awaitImpl();
inline ReturnOpStatus awaitImpl();

inline OperatorStatus fetchBlock(Block & block, size_t & start_transform_op_index);
inline ReturnOpStatus fetchBlock(Block & block, size_t & start_transform_op_index);

ALWAYS_INLINE void fillAwaitable(Operator * op)
{
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/Pipeline/Exec/tests/gtest_simple_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class SimpleGetResultSinkOp : public SinkOp
String getName() const override { return "SimpleGetResultSinkOp"; }

protected:
OperatorStatus writeImpl(Block && block) override
ReturnOpStatus writeImpl(Block && block) override
{
if (!block)
return OperatorStatus::FINISHED;
Expand Down Expand Up @@ -121,7 +121,7 @@ class SimpleOperatorTestRunner : public DB::tests::ExecutorTest
}};
PipelineExecutorContext exec_context;
auto op_pipeline = build(request, result_handler, exec_context);
while (op_pipeline->execute() != OperatorStatus::FINISHED) {}
while (op_pipeline->execute().status != OperatorStatus::FINISHED) {}
ASSERT_COLUMNS_EQ_UR(expect_columns, vstackBlocks(std::move(blocks)).getColumnsWithTypeAndName());
}
};
Expand All @@ -140,7 +140,7 @@ try
PipelineExecutorContext exec_context;
auto op_pipeline = build(request, result_handler, exec_context);
exec_context.cancel();
ASSERT_EQ(op_pipeline->execute(), OperatorStatus::CANCELLED);
ASSERT_EQ(op_pipeline->execute().status, OperatorStatus::CANCELLED);
}
CATCH

Expand Down
14 changes: 7 additions & 7 deletions dbms/src/Flash/Pipeline/Schedule/Events/tests/gtest_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class BaseTask : public EventTask
{}

protected:
ExecTaskStatus executeImpl() override
ReturnStatus executeImpl() override
{
--counter;
return ExecTaskStatus::FINISHED;
Expand Down Expand Up @@ -75,7 +75,7 @@ class RunTask : public EventTask
{}

protected:
ExecTaskStatus executeImpl() override
ReturnStatus executeImpl() override
{
while ((--loop_count) > 0)
return ExecTaskStatus::RUNNING;
Expand Down Expand Up @@ -116,7 +116,7 @@ class DeadLoopTask : public EventTask
{}

protected:
ExecTaskStatus executeImpl() override
ReturnStatus executeImpl() override
{
std::this_thread::sleep_for(std::chrono::milliseconds(1));
return ExecTaskStatus::RUNNING;
Expand Down Expand Up @@ -185,7 +185,7 @@ class ThrowExceptionTask : public EventTask
{}

protected:
ExecTaskStatus executeImpl() override { throw Exception("throw exception in doExecuteImpl"); }
ReturnStatus executeImpl() override { throw Exception("throw exception in doExecuteImpl"); }
};

class ThrowExceptionEvent : public Event
Expand Down Expand Up @@ -291,7 +291,7 @@ class TestPorfileTask : public EventTask

protected:
// executeImpl min_time ==> executeIOImpl min_time ==> awaitImpl min_time.
ExecTaskStatus executeImpl() override
ReturnStatus executeImpl() override
{
if (cpu_execute_time < min_time)
{
Expand All @@ -302,7 +302,7 @@ class TestPorfileTask : public EventTask
return ExecTaskStatus::IO_IN;
}

ExecTaskStatus executeIOImpl() override
ReturnStatus executeIOImpl() override
{
if (io_execute_time < min_time)
{
Expand All @@ -313,7 +313,7 @@ class TestPorfileTask : public EventTask
return ExecTaskStatus::WAITING;
}

ExecTaskStatus awaitImpl() override
ReturnStatus awaitImpl() override
{
if unlikely (!wait_stopwatch)
wait_stopwatch.emplace(CLOCK_MONOTONIC_COARSE);
Expand Down
12 changes: 9 additions & 3 deletions dbms/src/Flash/Pipeline/Schedule/Reactor/WaitReactor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Common/setThreadName.h>
#include <Flash/Pipeline/Schedule/Reactor/WaitReactor.h>
#include <Flash/Pipeline/Schedule/TaskScheduler.h>
#include <Flash/Pipeline/Schedule/Tasks/NotifyFuture.h>
#include <Flash/Pipeline/Schedule/Tasks/TaskHelper.h>
#include <common/logger_useful.h>
#include <errno.h>
Expand All @@ -29,15 +30,16 @@ WaitReactor::WaitReactor(TaskScheduler & scheduler_)
: scheduler(scheduler_)
{
GET_METRIC(tiflash_pipeline_scheduler, type_waiting_tasks_count).Set(0);
GET_METRIC(tiflash_pipeline_scheduler, type_wait_for_notify_tasks_count).Set(0);
thread = std::thread(&WaitReactor::loop, this);
}

bool WaitReactor::awaitAndCollectReadyTask(WaitingTask && task)
{
assert(task.first);
auto * task_ptr = task.second;
auto status = task_ptr->await();
switch (status)
auto return_status = task_ptr->await();
switch (return_status.status)
{
case ExecTaskStatus::WAITING:
return false;
Expand All @@ -50,6 +52,10 @@ bool WaitReactor::awaitAndCollectReadyTask(WaitingTask && task)
task_ptr->profile_info.elapsedAwaitTime();
io_tasks.push_back(std::move(task.first));
return true;
case ExecTaskStatus::WAIT_FOR_NOTIFY:
assert(return_status.future);
return_status.future->registerTask(std::move(task.first));
return true;
case FINISH_STATUS:
task_ptr->profile_info.elapsedAwaitTime();
task_ptr->startTraceMemory();
Expand All @@ -58,7 +64,7 @@ bool WaitReactor::awaitAndCollectReadyTask(WaitingTask && task)
task.first.reset();
return true;
default:
UNEXPECTED_STATUS(logger, status);
UNEXPECTED_STATUS(logger, return_status.status);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class PlainTask : public Task
: Task(exec_context_)
{}

ExecTaskStatus executeImpl() noexcept override { return ExecTaskStatus::FINISHED; }
ReturnStatus executeImpl() noexcept override { return ExecTaskStatus::FINISHED; }
};
} // namespace

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace tests
: Task(exec_context) \
{} \
\
ExecTaskStatus executeImpl() override \
ReturnStatus executeImpl() override \
{ \
if (task_exec_cur_count <= task_exec_total_count) \
{ \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class MockIOTask : public Task
: Task(exec_context_, "", is_io_in ? ExecTaskStatus::IO_IN : ExecTaskStatus::IO_OUT)
{}

ExecTaskStatus executeImpl() noexcept override { return ExecTaskStatus::FINISHED; }
ReturnStatus executeImpl() noexcept override { return ExecTaskStatus::FINISHED; }
};
} // namespace

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class PlainTask : public Task
: Task(exec_context_)
{}

ExecTaskStatus executeImpl() noexcept override { return ExecTaskStatus::FINISHED; }
ReturnStatus executeImpl() noexcept override { return ExecTaskStatus::FINISHED; }
};
} // namespace

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class SimpleTask : public Task

~SimpleTask() override = default;

ExecTaskStatus executeImpl() noexcept override
ReturnStatus executeImpl() noexcept override
{
if (exec_time_counter < total_exec_times)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ void AggregateFinalSpillTask::doFinalizeImpl()
agg_context.reset();
}

ExecTaskStatus AggregateFinalSpillTask::executeIOImpl()
ReturnStatus AggregateFinalSpillTask::executeIOImpl()
{
agg_context->spillData(index);
return ExecTaskStatus::FINISHED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class AggregateFinalSpillTask : public OutputIOEventTask
size_t index_);

protected:
ExecTaskStatus executeIOImpl() override;
ReturnStatus executeIOImpl() override;

void doFinalizeImpl() override;

Expand Down
Loading