From 9d3ee888fa30fb773b8ac4943d9e1f7679cb1b9e Mon Sep 17 00:00:00 2001 From: SeaRise Date: Tue, 30 Aug 2022 16:02:20 +0800 Subject: [PATCH 1/4] save --- dbms/src/TestUtils/executorSerializer.cpp | 74 ++++++++++++------- dbms/src/TestUtils/executorSerializer.h | 3 +- dbms/src/TestUtils/mockExecutor.cpp | 15 ++++ dbms/src/TestUtils/mockExecutor.h | 1 + .../TestUtils/tests/gtest_mock_executors.cpp | 11 +++ 5 files changed, 78 insertions(+), 26 deletions(-) diff --git a/dbms/src/TestUtils/executorSerializer.cpp b/dbms/src/TestUtils/executorSerializer.cpp index a0ae4b11270..5910fde5710 100644 --- a/dbms/src/TestUtils/executorSerializer.cpp +++ b/dbms/src/TestUtils/executorSerializer.cpp @@ -57,29 +57,6 @@ void toString(const Columns & columns, FmtBuffer & buf) } buf.fmtAppend("<{}, {}>", bound, getColumnTypeName(columns.at(bound))); } -} // namespace - -String ExecutorSerializer::serialize(const tipb::DAGRequest * dag_request) -{ - assert((dag_request->executors_size() > 0) != dag_request->has_root_executor()); - if (dag_request->has_root_executor()) - { - serialize(dag_request->root_executor(), 0); - return buf.toString(); - } - else - { - FmtBuffer buffer; - String prefix; - traverseExecutors(dag_request, [this, &prefix](const tipb::Executor & executor) { - assert(executor.has_executor_id()); - buf.fmtAppend("{}{}\n", prefix, executor.executor_id()); - prefix.append(" "); - return true; - }); - return buffer.toString(); - } -} void serializeTableScan(const String & executor_id, const tipb::TableScan & ts, FmtBuffer & buf) { @@ -263,8 +240,55 @@ void serializeSort(const String & executor_id, const tipb::Sort & sort [[maybe_u ", "); buf.append("}\n"); } +} // namespace + +String ExecutorSerializer::serialize(const tipb::DAGRequest * dag_request) +{ + assert((dag_request->executors_size() > 0) != dag_request->has_root_executor()); + if (dag_request->has_root_executor()) + { + serializeTreeStruct(dag_request->root_executor(), 0); + } + else + { + serializeListStruct(dag_request); + } + return buf.toString(); +} + +void ExecutorSerializer::serializeListStruct(const tipb::DAGRequest * dag_request) +{ + String prefix; + traverseExecutors(dag_request, [this, &prefix](const tipb::Executor & executor) { + buf.append(prefix); + switch (executor.tp()) + { + case tipb::ExecType::TypeTableScan: + serializeTableScan("TableScan", executor.tbl_scan(), buf); + break; + case tipb::ExecType::TypeSelection: + serializeSelection("Selection", executor.selection(), buf); + break; + case tipb::ExecType::TypeAggregation: + // stream agg is not supported, treated as normal agg + case tipb::ExecType::TypeStreamAgg: + serializeAggregation("Aggregation", executor.aggregation(), buf); + break; + case tipb::ExecType::TypeTopN: + serializeTopN("TopN", executor.topn(), buf); + break; + case tipb::ExecType::TypeLimit: + serializeLimit("Limit", executor.limit(), buf); + break; + default: + throw TiFlashException("Should not reach here", Errors::Coprocessor::Internal); + } + prefix.append(" "); + return true; + }); +} -void ExecutorSerializer::serialize(const tipb::Executor & root_executor, size_t level) +void ExecutorSerializer::serializeTreeStruct(const tipb::Executor & root_executor, size_t level) { auto append_str = [&level, this](const tipb::Executor & executor) { assert(executor.has_executor_id()); @@ -324,7 +348,7 @@ void ExecutorSerializer::serialize(const tipb::Executor & root_executor, size_t if (executor.has_join()) { for (const auto & child : executor.join().children()) - serialize(child, level); + serializeTreeStruct(child, level); return false; } return true; diff --git a/dbms/src/TestUtils/executorSerializer.h b/dbms/src/TestUtils/executorSerializer.h index 048c0564250..e2b8c77be88 100644 --- a/dbms/src/TestUtils/executorSerializer.h +++ b/dbms/src/TestUtils/executorSerializer.h @@ -28,7 +28,8 @@ class ExecutorSerializer String serialize(const tipb::DAGRequest * dag_request); private: - void serialize(const tipb::Executor & root_executor, size_t level); + void serializeListStruct(const tipb::DAGRequest * dag_request); + void serializeTreeStruct(const tipb::Executor & root_executor, size_t level); void addPrefix(size_t level) { buf.append(String(level, ' ')); } private: diff --git a/dbms/src/TestUtils/mockExecutor.cpp b/dbms/src/TestUtils/mockExecutor.cpp index 960c686ae8b..f5e5ff5fd57 100644 --- a/dbms/src/TestUtils/mockExecutor.cpp +++ b/dbms/src/TestUtils/mockExecutor.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -96,6 +97,20 @@ std::shared_ptr DAGRequestBuilder::build(MockDAGRequestContext return dag_request_ptr; } +std::shared_ptr DAGRequestBuilder::buildToListStruct(MockDAGRequestContext & mock_context) +{ + auto tree_struct_req = build(mock_context); + std::shared_ptr list_struct_req = std::make_shared(); + auto & mutable_executors = *list_struct_req->mutable_executors(); + traverseExecutors(tree_struct_req.get(), [&](const tipb::Executor & executor) -> bool { + auto * mutable_executor = mutable_executors.Add(); + (*mutable_executor) = executor; + mutable_executor->clear_executor_id(); + return true; + }); + return list_struct_req; +} + // Currently Sort and Window Executors don't support columnPrune. // TODO: support columnPrume for Sort and Window. void columnPrune(ExecutorPtr executor) diff --git a/dbms/src/TestUtils/mockExecutor.h b/dbms/src/TestUtils/mockExecutor.h index 45e56e63264..fe133569c0c 100644 --- a/dbms/src/TestUtils/mockExecutor.h +++ b/dbms/src/TestUtils/mockExecutor.h @@ -71,6 +71,7 @@ class DAGRequestBuilder } std::shared_ptr build(MockDAGRequestContext & mock_context); + std::shared_ptr buildToListStruct(MockDAGRequestContext & mock_context); QueryTasks buildMPPTasks(MockDAGRequestContext & mock_context); QueryTasks buildMPPTasks(MockDAGRequestContext & mock_context, const DAGProperties & properties); diff --git a/dbms/src/TestUtils/tests/gtest_mock_executors.cpp b/dbms/src/TestUtils/tests/gtest_mock_executors.cpp index ad9c7790f9e..ea5f1340ea3 100644 --- a/dbms/src/TestUtils/tests/gtest_mock_executors.cpp +++ b/dbms/src/TestUtils/tests/gtest_mock_executors.cpp @@ -340,5 +340,16 @@ try } } CATCH + +TEST_F(MockDAGRequestTest, ListBase) +try +{ + auto request = context.scan("test_db", "test_table").filter(eq(col("s1"), col("s2"))).buildToListStruct(context); + { + String expected = R"()"; + ASSERT_DAGREQUEST_EQAUL(expected, request); + } +} +CATCH } // namespace tests } // namespace DB From 223b39c7dfe209f80c780986dd99b6fb52a97c8e Mon Sep 17 00:00:00 2001 From: SeaRise Date: Tue, 30 Aug 2022 17:33:05 +0800 Subject: [PATCH 2/4] add mock executor ut --- dbms/src/TestUtils/mockExecutor.cpp | 2 +- .../TestUtils/tests/gtest_mock_executors.cpp | 32 +++++++++++++++++-- 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/dbms/src/TestUtils/mockExecutor.cpp b/dbms/src/TestUtils/mockExecutor.cpp index f5e5ff5fd57..94bc220e505 100644 --- a/dbms/src/TestUtils/mockExecutor.cpp +++ b/dbms/src/TestUtils/mockExecutor.cpp @@ -102,7 +102,7 @@ std::shared_ptr DAGRequestBuilder::buildToListStruct(MockDAGRe auto tree_struct_req = build(mock_context); std::shared_ptr list_struct_req = std::make_shared(); auto & mutable_executors = *list_struct_req->mutable_executors(); - traverseExecutors(tree_struct_req.get(), [&](const tipb::Executor & executor) -> bool { + traverseExecutorsReverse(tree_struct_req.get(), [&](const tipb::Executor & executor) -> bool { auto * mutable_executor = mutable_executors.Add(); (*mutable_executor) = executor; mutable_executor->clear_executor_id(); diff --git a/dbms/src/TestUtils/tests/gtest_mock_executors.cpp b/dbms/src/TestUtils/tests/gtest_mock_executors.cpp index ea5f1340ea3..a859ddb7c44 100644 --- a/dbms/src/TestUtils/tests/gtest_mock_executors.cpp +++ b/dbms/src/TestUtils/tests/gtest_mock_executors.cpp @@ -344,9 +344,37 @@ CATCH TEST_F(MockDAGRequestTest, ListBase) try { - auto request = context.scan("test_db", "test_table").filter(eq(col("s1"), col("s2"))).buildToListStruct(context); { - String expected = R"()"; + auto request = context + .scan("test_db", "test_table") + .filter(eq(col("s1"), col("s2"))) + .aggregation(Max(col("s1")), col("s2")) + .filter(eq(col("s2"), lit(Field("1", 1)))) + .limit(10) + .buildToListStruct(context); + String expected = R"( +Limit | 10 + Selection | equals(<1, String>, <-5692549928996306944, String>)} + Aggregation | group_by: {<1, String>}, agg_func: {max(<0, String>)} + Selection | equals(<0, String>, <1, String>)} + TableScan | {<0, String>, <1, String>})"; + ASSERT_DAGREQUEST_EQAUL(expected, request); + } + + { + auto request = context + .scan("test_db", "test_table") + .filter(eq(col("s1"), col("s2"))) + .aggregation(Max(col("s1")), col("s2")) + .filter(eq(col("s2"), lit(Field("1", 1)))) + .topN("s2", false, 10) + .buildToListStruct(context); + String expected = R"( +TopN | order_by: {(<1, String>, desc: false)}, limit: 10 + Selection | equals(<1, String>, <-5692549928996306944, String>)} + Aggregation | group_by: {<1, String>}, agg_func: {max(<0, String>)} + Selection | equals(<0, String>, <1, String>)} + TableScan | {<0, String>, <1, String>})"; ASSERT_DAGREQUEST_EQAUL(expected, request); } } From ab3008feed7f147f7e78f09866a315f20f172a16 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Tue, 30 Aug 2022 18:01:12 +0800 Subject: [PATCH 3/4] add test --- dbms/src/Flash/tests/gtest_interpreter.cpp | 45 ++++++++++++++++ .../Flash/tests/gtest_planner_interpreter.cpp | 52 +++++++++++++++++++ dbms/src/TestUtils/mockExecutor.cpp | 10 ++-- 3 files changed, 102 insertions(+), 5 deletions(-) diff --git a/dbms/src/Flash/tests/gtest_interpreter.cpp b/dbms/src/Flash/tests/gtest_interpreter.cpp index a053c2743fb..7f4ff2808ed 100644 --- a/dbms/src/Flash/tests/gtest_interpreter.cpp +++ b/dbms/src/Flash/tests/gtest_interpreter.cpp @@ -631,5 +631,50 @@ CreatingSets } CATCH +TEST_F(InterpreterExecuteTest, ListBase) +try +{ + { + auto request = context + .scan("test_db", "test_table") + .filter(eq(col("s1"), col("s2"))) + .aggregation(Max(col("s1")), col("s2")) + .limit(10) + .buildToListStruct(context); + String expected = R"( +Limit, limit = 10 + Expression: + Aggregating + Concat + Expression: + Filter: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + } + + { + auto request = context + .scan("test_db", "test_table") + .filter(eq(col("s1"), col("s2"))) + .aggregation(Max(col("s1")), col("s2")) + .topN("s2", false, 10) + .buildToListStruct(context); + String expected = R"( +Union: + SharedQuery x 20: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 20: limit = 10 + SharedQuery: + ParallelAggregating, max_threads: 20, final: true + Expression x 20: + Filter: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 20); + } +} +CATCH + } // namespace tests } // namespace DB diff --git a/dbms/src/Flash/tests/gtest_planner_interpreter.cpp b/dbms/src/Flash/tests/gtest_planner_interpreter.cpp index 038af91f725..0e733e2ed2a 100644 --- a/dbms/src/Flash/tests/gtest_planner_interpreter.cpp +++ b/dbms/src/Flash/tests/gtest_planner_interpreter.cpp @@ -984,5 +984,57 @@ CreatingSets } CATCH +TEST_F(PlannerInterpreterExecuteTest, ListBase) +try +{ + { + auto request = context + .scan("test_db", "test_table") + .filter(eq(col("s1"), col("s2"))) + .aggregation(Max(col("s1")), col("s2")) + .filter(eq(col("s2"), lit(Field("1", 1)))) + .limit(10) + .buildToListStruct(context); + String expected = R"( +Expression: + Limit, limit = 10 + Filter + Expression: + Aggregating + Concat + Expression: + Filter + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + } + + { + auto request = context + .scan("test_db", "test_table") + .filter(eq(col("s1"), col("s2"))) + .aggregation(Max(col("s1")), col("s2")) + .filter(eq(col("s2"), lit(Field("1", 1)))) + .topN("s2", false, 10) + .buildToListStruct(context); + String expected = R"( +Union: + Expression x 20: + SharedQuery: + MergeSorting, limit = 10 + Union: + PartialSorting x 20: limit = 10 + Expression: + Filter + Expression: + SharedQuery: + ParallelAggregating, max_threads: 20, final: true + Expression x 20: + Filter + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 20); + } +} +CATCH + } // namespace tests } // namespace DB diff --git a/dbms/src/TestUtils/mockExecutor.cpp b/dbms/src/TestUtils/mockExecutor.cpp index 94bc220e505..be17678f1fd 100644 --- a/dbms/src/TestUtils/mockExecutor.cpp +++ b/dbms/src/TestUtils/mockExecutor.cpp @@ -99,16 +99,16 @@ std::shared_ptr DAGRequestBuilder::build(MockDAGRequestContext std::shared_ptr DAGRequestBuilder::buildToListStruct(MockDAGRequestContext & mock_context) { - auto tree_struct_req = build(mock_context); - std::shared_ptr list_struct_req = std::make_shared(); - auto & mutable_executors = *list_struct_req->mutable_executors(); - traverseExecutorsReverse(tree_struct_req.get(), [&](const tipb::Executor & executor) -> bool { + auto dag_request_ptr = build(mock_context); + auto & mutable_executors = *dag_request_ptr->mutable_executors(); + traverseExecutorsReverse(dag_request_ptr.get(), [&](const tipb::Executor & executor) -> bool { auto * mutable_executor = mutable_executors.Add(); (*mutable_executor) = executor; mutable_executor->clear_executor_id(); return true; }); - return list_struct_req; + dag_request_ptr->release_root_executor(); + return dag_request_ptr; } // Currently Sort and Window Executors don't support columnPrune. From e487c26bd57d66af2f845ec41e830566108090fd Mon Sep 17 00:00:00 2001 From: SeaRise Date: Tue, 30 Aug 2022 18:26:44 +0800 Subject: [PATCH 4/4] address comment --- dbms/src/Flash/tests/gtest_interpreter.cpp | 4 +-- .../Flash/tests/gtest_planner_interpreter.cpp | 4 +-- dbms/src/TestUtils/mockExecutor.cpp | 29 ++++++++++--------- dbms/src/TestUtils/mockExecutor.h | 9 ++++-- .../TestUtils/tests/gtest_mock_executors.cpp | 4 +-- 5 files changed, 28 insertions(+), 22 deletions(-) diff --git a/dbms/src/Flash/tests/gtest_interpreter.cpp b/dbms/src/Flash/tests/gtest_interpreter.cpp index 7f4ff2808ed..ff0d2f05e34 100644 --- a/dbms/src/Flash/tests/gtest_interpreter.cpp +++ b/dbms/src/Flash/tests/gtest_interpreter.cpp @@ -640,7 +640,7 @@ try .filter(eq(col("s1"), col("s2"))) .aggregation(Max(col("s1")), col("s2")) .limit(10) - .buildToListStruct(context); + .build(context, DAGRequestType::list); String expected = R"( Limit, limit = 10 Expression: @@ -658,7 +658,7 @@ Limit, limit = 10 .filter(eq(col("s1"), col("s2"))) .aggregation(Max(col("s1")), col("s2")) .topN("s2", false, 10) - .buildToListStruct(context); + .build(context, DAGRequestType::list); String expected = R"( Union: SharedQuery x 20: diff --git a/dbms/src/Flash/tests/gtest_planner_interpreter.cpp b/dbms/src/Flash/tests/gtest_planner_interpreter.cpp index 0e733e2ed2a..744126d5f7c 100644 --- a/dbms/src/Flash/tests/gtest_planner_interpreter.cpp +++ b/dbms/src/Flash/tests/gtest_planner_interpreter.cpp @@ -994,7 +994,7 @@ try .aggregation(Max(col("s1")), col("s2")) .filter(eq(col("s2"), lit(Field("1", 1)))) .limit(10) - .buildToListStruct(context); + .build(context, DAGRequestType::list); String expected = R"( Expression: Limit, limit = 10 @@ -1015,7 +1015,7 @@ Expression: .aggregation(Max(col("s1")), col("s2")) .filter(eq(col("s2"), lit(Field("1", 1)))) .topN("s2", false, 10) - .buildToListStruct(context); + .build(context, DAGRequestType::list); String expected = R"( Union: Expression x 20: diff --git a/dbms/src/TestUtils/mockExecutor.cpp b/dbms/src/TestUtils/mockExecutor.cpp index be17678f1fd..4ed72917436 100644 --- a/dbms/src/TestUtils/mockExecutor.cpp +++ b/dbms/src/TestUtils/mockExecutor.cpp @@ -85,8 +85,9 @@ void DAGRequestBuilder::initDAGRequest(tipb::DAGRequest & dag_request) } // traval the AST tree to build tipb::Executor recursively. -std::shared_ptr DAGRequestBuilder::build(MockDAGRequestContext & mock_context) +std::shared_ptr DAGRequestBuilder::build(MockDAGRequestContext & mock_context, DAGRequestType type) { + // build tree struct base executor MPPInfo mpp_info(properties.start_ts, -1, -1, {}, mock_context.receiver_source_task_ids_map); std::shared_ptr dag_request_ptr = std::make_shared(); tipb::DAGRequest & dag_request = *dag_request_ptr; @@ -94,20 +95,20 @@ std::shared_ptr DAGRequestBuilder::build(MockDAGRequestContext root->toTiPBExecutor(dag_request.mutable_root_executor(), properties.collator, mpp_info, mock_context.context); root.reset(); executor_index = 0; - return dag_request_ptr; -} -std::shared_ptr DAGRequestBuilder::buildToListStruct(MockDAGRequestContext & mock_context) -{ - auto dag_request_ptr = build(mock_context); - auto & mutable_executors = *dag_request_ptr->mutable_executors(); - traverseExecutorsReverse(dag_request_ptr.get(), [&](const tipb::Executor & executor) -> bool { - auto * mutable_executor = mutable_executors.Add(); - (*mutable_executor) = executor; - mutable_executor->clear_executor_id(); - return true; - }); - dag_request_ptr->release_root_executor(); + // convert to list struct base executor + if (type == DAGRequestType::list) + { + auto & mutable_executors = *dag_request_ptr->mutable_executors(); + traverseExecutorsReverse(dag_request_ptr.get(), [&](const tipb::Executor & executor) -> bool { + auto * mutable_executor = mutable_executors.Add(); + (*mutable_executor) = executor; + mutable_executor->clear_executor_id(); + return true; + }); + dag_request_ptr->release_root_executor(); + } + return dag_request_ptr; } diff --git a/dbms/src/TestUtils/mockExecutor.h b/dbms/src/TestUtils/mockExecutor.h index fe133569c0c..0b6850669fb 100644 --- a/dbms/src/TestUtils/mockExecutor.h +++ b/dbms/src/TestUtils/mockExecutor.h @@ -43,6 +43,12 @@ inline int32_t convertToTiDBCollation(int32_t collation) return -(abs(collation)); } +enum class DAGRequestType +{ + tree, + list, +}; + /** Responsible for Hand write tipb::DAGRequest * Use this class to mock DAGRequest, then feed the DAGRequest into * the Interpreter for test purpose. @@ -70,8 +76,7 @@ class DAGRequestBuilder return root; } - std::shared_ptr build(MockDAGRequestContext & mock_context); - std::shared_ptr buildToListStruct(MockDAGRequestContext & mock_context); + std::shared_ptr build(MockDAGRequestContext & mock_context, DAGRequestType type = DAGRequestType::tree); QueryTasks buildMPPTasks(MockDAGRequestContext & mock_context); QueryTasks buildMPPTasks(MockDAGRequestContext & mock_context, const DAGProperties & properties); diff --git a/dbms/src/TestUtils/tests/gtest_mock_executors.cpp b/dbms/src/TestUtils/tests/gtest_mock_executors.cpp index a859ddb7c44..5a58c8fb2f4 100644 --- a/dbms/src/TestUtils/tests/gtest_mock_executors.cpp +++ b/dbms/src/TestUtils/tests/gtest_mock_executors.cpp @@ -351,7 +351,7 @@ try .aggregation(Max(col("s1")), col("s2")) .filter(eq(col("s2"), lit(Field("1", 1)))) .limit(10) - .buildToListStruct(context); + .build(context, DAGRequestType::list); String expected = R"( Limit | 10 Selection | equals(<1, String>, <-5692549928996306944, String>)} @@ -368,7 +368,7 @@ Limit | 10 .aggregation(Max(col("s1")), col("s2")) .filter(eq(col("s2"), lit(Field("1", 1)))) .topN("s2", false, 10) - .buildToListStruct(context); + .build(context, DAGRequestType::list); String expected = R"( TopN | order_by: {(<1, String>, desc: false)}, limit: 10 Selection | equals(<1, String>, <-5692549928996306944, String>)}