Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test: Refine fine-grained shuffle ut #6557

Merged
merged 3 commits into from
Dec 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dbms/src/Debug/MockExecutor/AggregationBinder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ void AggregationBinder::toMPPSubPlan(size_t & executor_index, const DAGPropertie
}

std::shared_ptr<ExchangeSenderBinder> exchange_sender
= std::make_shared<ExchangeSenderBinder>(executor_index, output_schema_for_partial_agg, partition_keys.empty() ? tipb::PassThrough : tipb::Hash, partition_keys);
= std::make_shared<ExchangeSenderBinder>(executor_index, output_schema_for_partial_agg, partition_keys.empty() ? tipb::PassThrough : tipb::Hash, partition_keys, fine_grained_shuffle_stream_count);
exchange_sender->children.push_back(partial_agg);

std::shared_ptr<ExchangeReceiverBinder> exchange_receiver
= std::make_shared<ExchangeReceiverBinder>(executor_index, output_schema_for_partial_agg);
= std::make_shared<ExchangeReceiverBinder>(executor_index, output_schema_for_partial_agg, fine_grained_shuffle_stream_count);
exchange_map[exchange_receiver->name] = std::make_pair(exchange_receiver, exchange_sender);

/// re-construct agg_exprs and gby_exprs in final_agg
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Debug/MockExecutor/ExchangeSenderBinder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ bool ExchangeSenderBinder::toTiPBExecutor(tipb::Executor * tipb_executor, int32_
tipb_executor->set_executor_id(name);
tipb::ExchangeSender * exchange_sender = tipb_executor->mutable_exchange_sender();
exchange_sender->set_tp(type);
if (tipb_executor->exchange_sender().tp() == tipb::Hash)
tipb_executor->set_fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count);
for (auto i : partition_keys)
{
auto * expr = exchange_sender->add_partition_keys();
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Debug/MockExecutor/ExchangeSenderBinder.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ namespace DB::mock
class ExchangeSenderBinder : public ExecutorBinder
{
public:
ExchangeSenderBinder(size_t & index, const DAGSchema & output, tipb::ExchangeType type_, const std::vector<size_t> & partition_keys_ = {})
ExchangeSenderBinder(size_t & index, const DAGSchema & output, tipb::ExchangeType type_, const std::vector<size_t> & partition_keys_ = {}, uint64_t fine_grained_shuffle_stream_count_ = 0)
: ExecutorBinder(index, "exchange_sender_" + std::to_string(index), output)
, type(type_)
, partition_keys(partition_keys_)
, fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count_)
{}

bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
Expand All @@ -38,6 +39,7 @@ class ExchangeSenderBinder : public ExecutorBinder
tipb::ExchangeType type;
TaskMetas task_metas;
std::vector<size_t> partition_keys;
uint64_t fine_grained_shuffle_stream_count;
};

ExecutorBinderPtr compileExchangeSender(ExecutorBinderPtr input, size_t & executor_index, tipb::ExchangeType exchange_type);
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Debug/MockExecutor/JoinBinder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,16 +224,16 @@ void JoinBinder::toMPPSubPlan(size_t & executor_index, const DAGProperties & pro
}

std::shared_ptr<ExchangeSenderBinder> left_exchange_sender
= std::make_shared<ExchangeSenderBinder>(executor_index, children[0]->output_schema, tipb::Hash, left_partition_keys);
= std::make_shared<ExchangeSenderBinder>(executor_index, children[0]->output_schema, tipb::Hash, left_partition_keys, fine_grained_shuffle_stream_count);
left_exchange_sender->children.push_back(children[0]);
std::shared_ptr<ExchangeSenderBinder> right_exchange_sender
= std::make_shared<ExchangeSenderBinder>(executor_index, children[1]->output_schema, tipb::Hash, right_partition_keys);
= std::make_shared<ExchangeSenderBinder>(executor_index, children[1]->output_schema, tipb::Hash, right_partition_keys, fine_grained_shuffle_stream_count);
right_exchange_sender->children.push_back(children[1]);

std::shared_ptr<ExchangeReceiverBinder> left_exchange_receiver
= std::make_shared<ExchangeReceiverBinder>(executor_index, children[0]->output_schema);
= std::make_shared<ExchangeReceiverBinder>(executor_index, children[0]->output_schema, fine_grained_shuffle_stream_count);
std::shared_ptr<ExchangeReceiverBinder> right_exchange_receiver
= std::make_shared<ExchangeReceiverBinder>(executor_index, children[1]->output_schema);
= std::make_shared<ExchangeReceiverBinder>(executor_index, children[1]->output_schema, fine_grained_shuffle_stream_count);
children[0] = left_exchange_receiver;
children[1] = right_exchange_receiver;

Expand Down
19 changes: 9 additions & 10 deletions dbms/src/Flash/tests/gtest_aggregation_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ namespace tests
types_col_name[a], types_col_name[b] \
}

class ExecutorAggTestRunner : public ExecutorTest
class AggExecutorTestRunner : public ExecutorTest
{
public:
using ColStringNullableType = std::optional<typename TypeTraits<String>::FieldType>;
Expand Down Expand Up @@ -61,7 +61,7 @@ class ExecutorAggTestRunner : public ExecutorTest
using ColumnWithFloat64 = std::vector<ColFloat64Type>;
using ColumnWithString = std::vector<ColStringType>;

~ExecutorAggTestRunner() override = default;
~AggExecutorTestRunner() override = default;

void initializeContext() override
{
Expand Down Expand Up @@ -225,7 +225,7 @@ class ExecutorAggTestRunner : public ExecutorTest
};

/// Guarantee the correctness of group by
TEST_F(ExecutorAggTestRunner, GroupBy)
TEST_F(AggExecutorTestRunner, GroupBy)
try
{
std::shared_ptr<tipb::DAGRequest> request;
Expand Down Expand Up @@ -296,7 +296,7 @@ try
}
CATCH

TEST_F(ExecutorAggTestRunner, AggregationMaxAndMin)
TEST_F(AggExecutorTestRunner, AggregationMaxAndMin)
try
{
std::shared_ptr<tipb::DAGRequest> request;
Expand Down Expand Up @@ -344,7 +344,7 @@ try
}
CATCH

TEST_F(ExecutorAggTestRunner, AggregationCount)
TEST_F(AggExecutorTestRunner, AggregationCount)
try
{
/// Prepare some data
Expand Down Expand Up @@ -388,8 +388,7 @@ CATCH

// TODO support more type of min, max, count.
// support more aggregation functions: sum, forst_row, group_concat

TEST_F(ExecutorAggTestRunner, AggregationCountGroupByFastPathMultiKeys)
TEST_F(AggExecutorTestRunner, AggregationCountGroupByFastPathMultiKeys)
try
{
/// Prepare some data
Expand Down Expand Up @@ -492,7 +491,7 @@ try
}
CATCH

TEST_F(ExecutorAggTestRunner, AggNull)
TEST_F(AggExecutorTestRunner, AggNull)
try
{
auto request = context
Expand All @@ -509,7 +508,7 @@ try
}
CATCH

TEST_F(ExecutorAggTestRunner, RepeatedAggregateFunction)
TEST_F(AggExecutorTestRunner, RepeatedAggregateFunction)
try
{
std::vector<ASTPtr> functions = {Max(col("s1")), Min(col("s1")), Sum(col("s2"))};
Expand Down Expand Up @@ -557,7 +556,7 @@ try
}
CATCH

TEST_F(ExecutorAggTestRunner, AggMerge)
TEST_F(AggExecutorTestRunner, AggMerge)
try
{
std::vector<String> tables{"big_table_1", "big_table_2", "big_table_3"};
Expand Down
52 changes: 0 additions & 52 deletions dbms/src/Flash/tests/gtest_compute_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -502,49 +502,6 @@ try
}
CATCH

/// For FineGrainedShuffleJoin/Agg test usage, update internal exchange senders/receivers flag
/// Allow select,agg,join,tableScan,exchangeSender,exchangeReceiver,projection executors only
void setFineGrainedShuffleForExchange(tipb::Executor & root)
{
tipb::Executor * current = &root;
while (current)
{
switch (current->tp())
{
case tipb::ExecType::TypeSelection:
current = const_cast<tipb::Executor *>(&current->selection().child());
break;
case tipb::ExecType::TypeAggregation:
current = const_cast<tipb::Executor *>(&current->aggregation().child());
break;
case tipb::ExecType::TypeProjection:
current = const_cast<tipb::Executor *>(&current->projection().child());
break;
case tipb::ExecType::TypeJoin:
{
/// update build side path
JoinInterpreterHelper::TiFlashJoin tiflash_join{current->join()};
current = const_cast<tipb::Executor *>(&current->join().children()[tiflash_join.build_side_index]);
break;
}
case tipb::ExecType::TypeExchangeSender:
if (current->exchange_sender().tp() == tipb::Hash)
current->set_fine_grained_shuffle_stream_count(8);
current = const_cast<tipb::Executor *>(&current->exchange_sender().child());
break;
case tipb::ExecType::TypeExchangeReceiver:
current->set_fine_grained_shuffle_stream_count(8);
current = nullptr;
break;
case tipb::ExecType::TypeTableScan:
current = nullptr;
break;
default:
throw TiFlashException("Should not reach here", Errors::Coprocessor::Internal);
}
}
}

TEST_F(ComputeServerRunner, runFineGrainedShuffleJoinTest)
try
{
Expand Down Expand Up @@ -578,10 +535,6 @@ try
.join(context.scan("test_db", "r_table_2"), join_type, {col("s1"), col("s2")}, enable)
.project({col("l_table_2.s1"), col("l_table_2.s2"), col("l_table_2.s3")});
auto tasks = request2.buildMPPTasks(context, properties);
for (auto & task : tasks)
{
setFineGrainedShuffleForExchange(const_cast<tipb::Executor &>(task.dag_request->root_executor()));
}
const auto actual_cols = executeMPPTasks(tasks, properties, MockComputeServerManager::instance().getServerConfigMap());
ASSERT_COLUMNS_EQ_UR(expected_cols, actual_cols);
}
Expand All @@ -606,11 +559,6 @@ try
.scan("test_db", "test_table_2")
.aggregation({Max(col("s3"))}, {col("s1"), col("s2")}, enable);
auto tasks = request2.buildMPPTasks(context, properties);
for (auto & task : tasks)
{
setFineGrainedShuffleForExchange(const_cast<tipb::Executor &>(task.dag_request->root_executor()));
}

const auto actual_cols = executeMPPTasks(tasks, properties, MockComputeServerManager::instance().getServerConfigMap());
ASSERT_COLUMNS_EQ_UR(expected_cols, actual_cols);
}
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/tests/gtest_limit_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace DB
namespace tests
{

class ExecutorLimitTestRunner : public DB::tests::ExecutorTest
class LimitExecutorTestRunner : public DB::tests::ExecutorTest
{
public:
using ColDataType = std::optional<typename TypeTraits<String>::FieldType>;
Expand All @@ -47,7 +47,7 @@ class ExecutorLimitTestRunner : public DB::tests::ExecutorTest
const ColumnWithData col0{"col0-0", {}, "col0-2", "col0-3", {}, "col0-5", "col0-6", "col0-7"};
};

TEST_F(ExecutorLimitTestRunner, Limit)
TEST_F(LimitExecutorTestRunner, Limit)
try
{
std::shared_ptr<tipb::DAGRequest> request;
Expand Down Expand Up @@ -77,7 +77,7 @@ try
}
CATCH

TEST_F(ExecutorLimitTestRunner, RawQuery)
TEST_F(LimitExecutorTestRunner, RawQuery)
try
{
String query = "select * from test_db.projection_test_table limit 1";
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Flash/tests/gtest_projection_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace DB
namespace tests
{

class ExecutorProjectionTestRunner : public DB::tests::ExecutorTest
class ProjectionExecutorTestRunner : public DB::tests::ExecutorTest
{
public:
using ColDataString = std::vector<std::optional<typename TypeTraits<String>::FieldType>>;
Expand Down Expand Up @@ -94,7 +94,7 @@ class ExecutorProjectionTestRunner : public DB::tests::ExecutorTest
const String table_name{"projection_test_table"};
};

TEST_F(ExecutorProjectionTestRunner, Projection)
TEST_F(ProjectionExecutorTestRunner, Projection)
try
{
/// Check single column
Expand Down Expand Up @@ -141,7 +141,7 @@ try
}
CATCH

TEST_F(ExecutorProjectionTestRunner, ProjectionFunction)
TEST_F(ProjectionExecutorTestRunner, ProjectionFunction)
try
{
std::shared_ptr<tipb::DAGRequest> request;
Expand Down Expand Up @@ -231,7 +231,7 @@ try
}
CATCH

TEST_F(ExecutorProjectionTestRunner, MultiFunction)
TEST_F(ProjectionExecutorTestRunner, MultiFunction)
try
{
MockAstVec functions = {
Expand Down Expand Up @@ -308,7 +308,7 @@ try
}
CATCH

TEST_F(ExecutorProjectionTestRunner, MultiProjection)
TEST_F(ProjectionExecutorTestRunner, MultiProjection)
try
{
auto req = context
Expand Down Expand Up @@ -375,7 +375,7 @@ try
}
CATCH

TEST_F(ExecutorProjectionTestRunner, ProjectionThenAgg)
TEST_F(ProjectionExecutorTestRunner, ProjectionThenAgg)
try
{
auto req = context
Expand Down
11 changes: 6 additions & 5 deletions dbms/src/Flash/tests/gtest_topn_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ namespace DB
{
namespace tests
{
class ExecutorTopNTestRunner : public DB::tests::ExecutorTest

class TopNExecutorTestRunner : public DB::tests::ExecutorTest
{
public:
using ColStringType = std::optional<typename TypeTraits<String>::FieldType>;
Expand Down Expand Up @@ -85,7 +86,7 @@ class ExecutorTopNTestRunner : public DB::tests::ExecutorTest

std::shared_ptr<tipb::DAGRequest> buildDAGRequest(const String & table_name, MockOrderByItemVec order_by_items, int limit, MockAstVec func_proj_ast = {}, MockColumnNameVec out_proj_ast = {})
{
if (func_proj_ast.size() == 0)
if (func_proj_ast.empty())
return context.scan(db_name, table_name).topN(order_by_items, limit).build(context);
else
return context.scan(db_name, table_name).project(func_proj_ast).topN(order_by_items, limit).project(out_proj_ast).build(context);
Expand All @@ -106,7 +107,7 @@ class ExecutorTopNTestRunner : public DB::tests::ExecutorTest
ColumnWithInt32 col_salary{1300, 0, {}, 900, {}, -300};
};

TEST_F(ExecutorTopNTestRunner, TopN)
TEST_F(TopNExecutorTestRunner, TopN)
try
{
std::shared_ptr<tipb::DAGRequest> request;
Expand Down Expand Up @@ -173,7 +174,7 @@ try
}
CATCH

TEST_F(ExecutorTopNTestRunner, TopNFunction)
TEST_F(TopNExecutorTestRunner, TopNFunction)
try
{
std::shared_ptr<tipb::DAGRequest> request;
Expand Down Expand Up @@ -245,7 +246,7 @@ try
}
CATCH

TEST_F(ExecutorTopNTestRunner, BigTable)
TEST_F(TopNExecutorTestRunner, BigTable)
try
{
std::vector<String> tables{"big_table_1", "big_table_2", "big_table_3"};
Expand Down