From d987c89865369d0575fae9bc7eb3507cf95229f6 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Mon, 25 Apr 2022 21:51:01 +0800 Subject: [PATCH 01/20] introd physical plan --- dbms/src/Flash/CMakeLists.txt | 2 + dbms/src/Flash/Coprocessor/InterpreterDAG.cpp | 25 +++-- dbms/src/Flash/Planner/FinalizeHelper.cpp | 95 +++++++++++++++++++ dbms/src/Flash/Planner/FinalizeHelper.h | 20 ++++ dbms/src/Flash/Planner/PhysicalPlan.cpp | 62 ++++++++++++ dbms/src/Flash/Planner/PhysicalPlan.h | 84 ++++++++++++++++ dbms/src/Flash/Planner/PhysicalPlanHelper.cpp | 13 +++ dbms/src/Flash/Planner/PhysicalPlanHelper.h | 8 ++ dbms/src/Flash/Planner/PlanType.cpp | 42 ++++++++ dbms/src/Flash/Planner/PlanType.h | 33 +++++++ dbms/src/Flash/Planner/Planner.cpp | 67 +++++++++++++ dbms/src/Flash/Planner/Planner.h | 60 ++++++++++++ dbms/src/Flash/Planner/plans/PhysicalUnary.h | 48 ++++++++++ dbms/src/Interpreters/Settings.h | 14 +-- 14 files changed, 560 insertions(+), 13 deletions(-) create mode 100644 dbms/src/Flash/Planner/FinalizeHelper.cpp create mode 100644 dbms/src/Flash/Planner/FinalizeHelper.h create mode 100644 dbms/src/Flash/Planner/PhysicalPlan.cpp create mode 100644 dbms/src/Flash/Planner/PhysicalPlan.h create mode 100644 dbms/src/Flash/Planner/PhysicalPlanHelper.cpp create mode 100644 dbms/src/Flash/Planner/PhysicalPlanHelper.h create mode 100644 dbms/src/Flash/Planner/PlanType.cpp create mode 100644 dbms/src/Flash/Planner/PlanType.h create mode 100644 dbms/src/Flash/Planner/Planner.cpp create mode 100644 dbms/src/Flash/Planner/Planner.h create mode 100644 dbms/src/Flash/Planner/plans/PhysicalUnary.h diff --git a/dbms/src/Flash/CMakeLists.txt b/dbms/src/Flash/CMakeLists.txt index 1b9e3e0aaf8..1d3aa288f29 100644 --- a/dbms/src/Flash/CMakeLists.txt +++ b/dbms/src/Flash/CMakeLists.txt @@ -17,6 +17,8 @@ include(${TiFlash_SOURCE_DIR}/cmake/dbms_glob_sources.cmake) add_headers_and_sources(flash_service .) add_headers_and_sources(flash_service ./Coprocessor) add_headers_and_sources(flash_service ./Mpp) +add_headers_and_sources(flash_service ./Planner) +add_headers_and_sources(flash_service ./Planner/plans) add_headers_and_sources(flash_service ./Statistics) add_library(flash_service ${flash_service_headers} ${flash_service_sources}) diff --git a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp index 6b118f1dd40..fb247e3ad2a 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include namespace DB @@ -62,12 +63,24 @@ BlockInputStreams InterpreterDAG::executeQueryBlock(DAGQueryBlock & query_block) BlockInputStreams child_streams = executeQueryBlock(*child); input_streams_vec.push_back(child_streams); } - DAGQueryBlockInterpreter query_block_interpreter( - context, - input_streams_vec, - query_block, - max_streams); - return query_block_interpreter.execute(); + if (context.getSettingsRef().enable_planner && Planner::isSupported(query_block)) + { + Planner planner( + context, + input_streams_vec, + query_block, + max_streams); + return planner.execute(); + } + else + { + DAGQueryBlockInterpreter query_block_interpreter( + context, + input_streams_vec, + query_block, + max_streams); + return query_block_interpreter.execute(); + } } BlockIO InterpreterDAG::execute() diff --git a/dbms/src/Flash/Planner/FinalizeHelper.cpp b/dbms/src/Flash/Planner/FinalizeHelper.cpp new file mode 100644 index 00000000000..ad80b29e800 --- /dev/null +++ b/dbms/src/Flash/Planner/FinalizeHelper.cpp @@ -0,0 +1,95 @@ +#include +#include +#include +#include +#include + +#include + +namespace DB::FinalizeHelper +{ +void prependProjectInputIfNeed(ExpressionActionsPtr & actions, size_t columns_from_previous) +{ + if (!actions->getRequiredColumnsWithTypes().empty() + && columns_from_previous > actions->getRequiredColumnsWithTypes().size()) + { + actions->prependProjectInput(); + } +} + +void checkSchemaContainsParentRequire(const NamesAndTypes & schema, const Names & parent_require) +{ + NameSet schema_set; + for (const auto & column : schema) + schema_set.insert(column.name); + for (const auto & parent_require_column : parent_require) + { + if (unlikely(schema_set.find(parent_require_column) == schema_set.end())) + throw TiFlashException( + fmt::format("schema don't contain parent require column: {}", parent_require_column), + Errors::Coprocessor::Internal); + } +} + +void checkParentRequireContainsSchema(const Names & parent_require, const NamesAndTypes & schema) +{ + NameSet parent_require_set; + for (const auto & parent_require_column : parent_require) + parent_require_set.insert(parent_require_column); + for (const auto & schema_column : schema) + { + if (unlikely(parent_require_set.find(schema_column.name) == parent_require_set.end())) + throw TiFlashException( + fmt::format("parent require don't contain schema column: {}", schema_column.name), + Errors::Coprocessor::Internal); + } +} + +void checkSampleBlockContainsSchema(const Block & sample_block, const NamesAndTypes & schema) +{ + for (const auto & schema_column : schema) + { + if (unlikely(!sample_block.has(schema_column.name))) + throw TiFlashException( + fmt::format("sample block don't contain schema column: {}", schema_column.name), + Errors::Coprocessor::Internal); + + const auto & type_in_sample_block = sample_block.getByName(schema_column.name).type->getName(); + const auto & type_in_schema = schema_column.type->getName(); + if (unlikely(type_in_sample_block != type_in_schema)) + throw TiFlashException( + fmt::format( + "the type of column `{}` in sample block `{}` is difference from the one in schema `{}`", + schema_column.name, + type_in_sample_block, + type_in_schema), + Errors::Coprocessor::Internal); + } +} + +void checkSchemaContainsSampleBlock(const NamesAndTypes & schema, const Block & sample_block) +{ + std::unordered_map schema_map; + for (const auto & column : schema) + schema_map[column.name] = column.type; + for (const auto & sample_block_column : sample_block) + { + auto it = schema_map.find(sample_block_column.name); + if (unlikely(it == schema_map.end())) + throw TiFlashException( + fmt::format("schema don't contain sample block column: {}", sample_block_column.name), + Errors::Coprocessor::Internal); + + const auto & type_in_schema = it->second->getName(); + const auto & type_in_sample_block = sample_block_column.type->getName(); + if (unlikely(type_in_sample_block != type_in_schema)) + throw TiFlashException( + fmt::format( + "the type of column `{}` in schema `{}` is difference from the one in sample block `{}`", + sample_block_column.name, + type_in_schema, + type_in_sample_block), + Errors::Coprocessor::Internal); + } +} +} // namespace DB::FinalizeHelper diff --git a/dbms/src/Flash/Planner/FinalizeHelper.h b/dbms/src/Flash/Planner/FinalizeHelper.h new file mode 100644 index 00000000000..26fc0ffdb11 --- /dev/null +++ b/dbms/src/Flash/Planner/FinalizeHelper.h @@ -0,0 +1,20 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace DB::FinalizeHelper +{ +void prependProjectInputIfNeed(ExpressionActionsPtr & actions, size_t columns_from_previous); + +void checkSchemaContainsParentRequire(const NamesAndTypes & schema, const Names & parent_require); + +void checkParentRequireContainsSchema(const Names & parent_require, const NamesAndTypes & schema); + +void checkSampleBlockContainsSchema(const Block & sample_block, const NamesAndTypes & schema); + +void checkSchemaContainsSampleBlock(const NamesAndTypes & schema, const Block & sample_block); +} // namespace DB::FinalizeHelper diff --git a/dbms/src/Flash/Planner/PhysicalPlan.cpp b/dbms/src/Flash/Planner/PhysicalPlan.cpp new file mode 100644 index 00000000000..1d4f95f3716 --- /dev/null +++ b/dbms/src/Flash/Planner/PhysicalPlan.cpp @@ -0,0 +1,62 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include + +namespace DB +{ +String PhysicalPlan::toString() +{ + auto schema_to_string = [&]() { + FmtBuffer buffer; + buffer.joinStr( + schema.cbegin(), + schema.cend(), + [](const auto & item, FmtBuffer & buf) { buf.fmtAppend("<{}, {}>", item.name, item.type->getName()); }, + ", "); + return buffer.toString(); + }; + return fmt::format( + "type: {}, executor_id: {}, is_record_profile_streams: {}, schema: {}", + DB::toString(type), + executor_id, + is_record_profile_streams, + schema_to_string()); +} + +void PhysicalPlan::finalize() +{ + finalize(PhysicalPlanHelper::schemaToNames(schema)); +} + +void PhysicalPlan::recordProfileStreams(DAGPipeline & pipeline, const Context & context) +{ + if (is_record_profile_streams) + { + auto & profile_streams = context.getDAGContext()->getProfileStreamsMap()[executor_id]; + pipeline.transform([&profile_streams](auto & stream) { profile_streams.push_back(stream); }); + } +} + +void PhysicalPlan::transform(DAGPipeline & pipeline, Context & context, size_t max_streams) +{ + transformImpl(pipeline, context, max_streams); + recordProfileStreams(pipeline, context); +} +} // namespace DB diff --git a/dbms/src/Flash/Planner/PhysicalPlan.h b/dbms/src/Flash/Planner/PhysicalPlan.h new file mode 100644 index 00000000000..49959cc10ad --- /dev/null +++ b/dbms/src/Flash/Planner/PhysicalPlan.h @@ -0,0 +1,84 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include + +#include + +namespace DB +{ +struct DAGPipeline; +class Context; +class DAGContext; + +class PhysicalPlan; +using PhysicalPlanPtr = std::shared_ptr; + +class PhysicalPlan +{ +public: + PhysicalPlan(const String & executor_id_, const PlanType & type_, const NamesAndTypes & schema_, const String & req_id) + : executor_id(executor_id_) + , type(type_) + , schema(schema_) + , log(DB::toString(type_), req_id) + {} + + virtual ~PhysicalPlan() = default; + + virtual PhysicalPlanPtr children(size_t /*i*/) const = 0; + + virtual void setChild(size_t /*i*/, const PhysicalPlanPtr & /*new_child*/) = 0; + + const PlanType & tp() const { return type; } + + const String & execId() const { return executor_id; } + + const NamesAndTypes & getSchema() const { return schema; } + + virtual void appendChild(const PhysicalPlanPtr & /*new_child*/) = 0; + + virtual size_t childrenSize() const = 0; + + virtual void transform(DAGPipeline & pipeline, Context & context, size_t max_streams); + + virtual void finalize(const Names & parent_require) = 0; + void finalize(); + + /// Obtain a sample block that contains the names and types of result columns. + virtual const Block & getSampleBlock() const = 0; + + void disableRecordProfileStreams() { is_record_profile_streams = false; } + + String toString(); + +protected: + virtual void transformImpl(DAGPipeline & /*pipeline*/, Context & /*context*/, size_t /*max_streams*/){}; + + void recordProfileStreams(DAGPipeline & pipeline, const Context & context); + + String executor_id; + PlanType type; + NamesAndTypes schema; + bool is_record_profile_streams = true; + + Logger log; +}; +} // namespace DB diff --git a/dbms/src/Flash/Planner/PhysicalPlanHelper.cpp b/dbms/src/Flash/Planner/PhysicalPlanHelper.cpp new file mode 100644 index 00000000000..9d1fc4fee2c --- /dev/null +++ b/dbms/src/Flash/Planner/PhysicalPlanHelper.cpp @@ -0,0 +1,13 @@ +#include + +namespace DB::PhysicalPlanHelper +{ +Names schemaToNames(const NamesAndTypes & schema) +{ + Names names; + names.reserve(schema.size()); + for (const auto & column : schema) + names.push_back(column.name); + return names; +} +} // namespace DB::PhysicalPlanHelper diff --git a/dbms/src/Flash/Planner/PhysicalPlanHelper.h b/dbms/src/Flash/Planner/PhysicalPlanHelper.h new file mode 100644 index 00000000000..22ccf876eac --- /dev/null +++ b/dbms/src/Flash/Planner/PhysicalPlanHelper.h @@ -0,0 +1,8 @@ +#pragma once + +#include + +namespace DB::PhysicalPlanHelper +{ +Names schemaToNames(const NamesAndTypes & schema); +} // namespace DB::PhysicalPlanHelper diff --git a/dbms/src/Flash/Planner/PlanType.cpp b/dbms/src/Flash/Planner/PlanType.cpp new file mode 100644 index 00000000000..0b733fd74e2 --- /dev/null +++ b/dbms/src/Flash/Planner/PlanType.cpp @@ -0,0 +1,42 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +namespace DB +{ +String toString(const PlanType & plan_type) +{ + switch (plan_type) + { + case Aggregation: + return "Aggregation"; + case ExchangeSender: + return "ExchangeSender"; + case Limit: + return "Limit"; + case Projection: + return "Projection"; + case Selection: + return "Selection"; + case Source: + return "Source"; + case TopN: + return "TopN"; + default: + throw Exception("Unknown PlanType"); + } +} +} // namespace DB diff --git a/dbms/src/Flash/Planner/PlanType.h b/dbms/src/Flash/Planner/PlanType.h new file mode 100644 index 00000000000..e9ab9cb6f67 --- /dev/null +++ b/dbms/src/Flash/Planner/PlanType.h @@ -0,0 +1,33 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB +{ +enum PlanType +{ + Aggregation, + ExchangeSender, + Limit, + Projection, + Selection, + Source, + TopN, +}; + +String toString(const PlanType & plan_type); +} // namespace DB diff --git a/dbms/src/Flash/Planner/Planner.cpp b/dbms/src/Flash/Planner/Planner.cpp new file mode 100644 index 00000000000..cd597f33998 --- /dev/null +++ b/dbms/src/Flash/Planner/Planner.cpp @@ -0,0 +1,67 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include + +namespace DB +{ +Planner::Planner( + Context & context_, + const std::vector & input_streams_vec_, + const DAGQueryBlock & query_block_, + size_t max_streams_) + : context(context_) + , input_streams_vec(input_streams_vec_) + , query_block(query_block_) + , max_streams(max_streams_) + , log(Logger::get("Planner", dagContext().log ? dagContext().log->identifier() : "")) +{} + +BlockInputStreams Planner::execute() +{ + DAGPipeline pipeline; + executeImpl(pipeline); + if (!pipeline.streams_with_non_joined_data.empty()) + { + executeUnion(pipeline, max_streams, log); + restorePipelineConcurrency(pipeline); + } + return pipeline.streams; +} + +bool Planner::isSupported(const DAGQueryBlock & query_block) +{ + return false; +} + +DAGContext & Planner::dagContext() const +{ + return *context.getDAGContext(); +} + +void Planner::restorePipelineConcurrency(DAGPipeline & pipeline) +{ + if (query_block.can_restore_pipeline_concurrency) + restoreConcurrency(pipeline, dagContext().final_concurrency, log); +} + +void Planner::executeImpl(DAGPipeline & pipeline) +{ +} +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Flash/Planner/Planner.h b/dbms/src/Flash/Planner/Planner.h new file mode 100644 index 00000000000..7abe258a1aa --- /dev/null +++ b/dbms/src/Flash/Planner/Planner.h @@ -0,0 +1,60 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +namespace DB +{ +class Context; +class DAGContext; + +class Planner +{ +public: + Planner( + Context & context_, + const std::vector & input_streams_vec_, + const DAGQueryBlock & query_block_, + size_t max_streams_); + + ~Planner() = default; + + BlockInputStreams execute(); + + static bool isSupported(const DAGQueryBlock & query_block); + +private: + DAGContext & dagContext() const; + + void restorePipelineConcurrency(DAGPipeline & pipeline); + + void executeImpl(DAGPipeline & pipeline); + +private: + Context & context; + + std::vector input_streams_vec; + + const DAGQueryBlock & query_block; + + /// How many streams we ask for storage to produce, and in how many threads we will do further processing. + size_t max_streams = 1; + + LoggerPtr log; +}; +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Flash/Planner/plans/PhysicalUnary.h b/dbms/src/Flash/Planner/plans/PhysicalUnary.h new file mode 100644 index 00000000000..2b978b0f743 --- /dev/null +++ b/dbms/src/Flash/Planner/plans/PhysicalUnary.h @@ -0,0 +1,48 @@ +#pragma once + +#include +#include +#include +#include + +namespace DB +{ +/** + * A physical plan node with single child. + */ +class PhysicalUnary : public PhysicalPlan +{ +public: + PhysicalUnary(const String & executor_id_, const PlanType & type_, const NamesAndTypes & schema_, const String & req_id) + : PhysicalPlan(executor_id_, type_, schema_, req_id) + {} + + PhysicalPlanPtr children(size_t i) const override + { + RUNTIME_ASSERT(i == 0, log, fmt::format("child_index({}) should not >= childrenSize({})", i, childrenSize())); + assert(child); + return child; + } + + void setChild(size_t i, const PhysicalPlanPtr & new_child) override + { + RUNTIME_ASSERT(i == 0, log, fmt::format("child_index({}) should not >= childrenSize({})", i, childrenSize())); + assert(new_child); + assert(new_child.get() != this); + child = new_child; + } + + void appendChild(const PhysicalPlanPtr & new_child) override + { + RUNTIME_ASSERT(!child, log, fmt::format("the actual children size had be the max size({}), don't append child again", childrenSize())); + assert(new_child); + assert(new_child.get() != this); + child = new_child; + } + + size_t childrenSize() const override { return 1; }; + +protected: + PhysicalPlanPtr child; +}; +} // namespace DB diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 6adecea4d60..70503b8d857 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -209,7 +209,7 @@ struct Settings * Basically, limits are checked for each block (not every row). That is, the limits can be slightly violated. \ * Almost all limits apply only to SELECTs. \ * Almost all limits apply to each stream individually. \ - */ \ + */ \ \ M(SettingUInt64, max_rows_to_read, 0, "Limit on read rows from the most 'deep' sources. That is, only in the deepest subquery. When reading from a remote server, it " \ "is only checked on a remote server.") \ @@ -272,8 +272,8 @@ struct Settings M(SettingUInt64, dt_segment_delta_small_column_file_size, 8388608, "Determine whether a column file in delta is small or not. 8MB by default.") \ M(SettingUInt64, dt_segment_stable_pack_rows, DEFAULT_MERGE_BLOCK_SIZE, "Expected stable pack rows in DeltaTree Engine.") \ M(SettingFloat, dt_segment_wait_duration_factor, 1, "The factor of wait duration in a write stall.") \ - M(SettingUInt64, dt_bg_gc_check_interval, 5, "Background gc thread check interval, the unit is second.") \ - M(SettingInt64, dt_bg_gc_max_segments_to_check_every_round, 100, "Max segments to check in every gc round, value less than or equal to 0 means gc no segments.") \ + M(SettingUInt64, dt_bg_gc_check_interval, 5, "Background gc thread check interval, the unit is second.") \ + M(SettingInt64, dt_bg_gc_max_segments_to_check_every_round, 100, "Max segments to check in every gc round, value less than or equal to 0 means gc no segments.") \ M(SettingFloat, dt_bg_gc_ratio_threhold_to_trigger_gc, 1.2, "Trigger segment's gc when the ratio of invalid version exceed this threhold. Values smaller than or equal to 1.0 means gc all " \ "segments") \ M(SettingFloat, dt_bg_gc_delta_delete_ratio_to_trigger_gc, 0.3, "Trigger segment's gc when the ratio of delta delete range to stable exceeds this ratio.") \ @@ -350,13 +350,13 @@ struct Settings M(SettingUInt64, elastic_threadpool_init_cap, 400, "The size of elastic thread pool.") \ M(SettingUInt64, elastic_threadpool_shrink_period_ms, 300000, "The shrink period(ms) of elastic thread pool.") \ M(SettingBool, enable_local_tunnel, true, "Enable local data transfer between local MPP tasks.") \ - M(SettingBool, enable_async_grpc_client, true, "Enable async grpc in MPP.") \ - M(SettingUInt64, grpc_completion_queue_pool_size, 0, "The size of gRPC completion queue pool. 0 means using hardware_concurrency.")\ + M(SettingBool, enable_async_grpc_client, true, "Enable async grpc in MPP.") \ + M(SettingUInt64, grpc_completion_queue_pool_size, 0, "The size of gRPC completion queue pool. 0 means using hardware_concurrency.") \ M(SettingBool, enable_async_server, true, "Enable async rpc server.") \ M(SettingUInt64, async_pollers_per_cq, 200, "grpc async pollers per cqs") \ M(SettingUInt64, async_cqs, 1, "grpc async cqs") \ - M(SettingUInt64, preallocated_request_count_per_poller, 20, "grpc preallocated_request_count_per_poller") - + M(SettingUInt64, preallocated_request_count_per_poller, 20, "grpc preallocated_request_count_per_poller") \ + M(SettingBool, enable_planner, true, "Enable planner") // clang-format on #define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) TYPE NAME{DEFAULT}; From e24489acad2425f02a402fd9b5880e4c5749f1be Mon Sep 17 00:00:00 2001 From: SeaRise Date: Mon, 25 Apr 2022 22:51:17 +0800 Subject: [PATCH 02/20] add source --- dbms/src/Flash/Planner/FinalizeHelper.cpp | 95 ------------------- dbms/src/Flash/Planner/FinalizeHelper.h | 20 ---- .../src/Flash/Planner/PhysicalPlanBuilder.cpp | 32 +++++++ dbms/src/Flash/Planner/PhysicalPlanBuilder.h | 47 +++++++++ dbms/src/Flash/Planner/PlanType.cpp | 12 --- dbms/src/Flash/Planner/PlanType.h | 6 -- dbms/src/Flash/Planner/Planner.cpp | 14 ++- dbms/src/Flash/Planner/plans/PhysicalLeaf.h | 49 ++++++++++ dbms/src/Flash/Planner/plans/PhysicalSource.h | 55 +++++++++++ dbms/src/Flash/Planner/plans/PhysicalUnary.h | 14 +++ 10 files changed, 209 insertions(+), 135 deletions(-) delete mode 100644 dbms/src/Flash/Planner/FinalizeHelper.cpp delete mode 100644 dbms/src/Flash/Planner/FinalizeHelper.h create mode 100644 dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp create mode 100644 dbms/src/Flash/Planner/PhysicalPlanBuilder.h create mode 100644 dbms/src/Flash/Planner/plans/PhysicalLeaf.h create mode 100644 dbms/src/Flash/Planner/plans/PhysicalSource.h diff --git a/dbms/src/Flash/Planner/FinalizeHelper.cpp b/dbms/src/Flash/Planner/FinalizeHelper.cpp deleted file mode 100644 index ad80b29e800..00000000000 --- a/dbms/src/Flash/Planner/FinalizeHelper.cpp +++ /dev/null @@ -1,95 +0,0 @@ -#include -#include -#include -#include -#include - -#include - -namespace DB::FinalizeHelper -{ -void prependProjectInputIfNeed(ExpressionActionsPtr & actions, size_t columns_from_previous) -{ - if (!actions->getRequiredColumnsWithTypes().empty() - && columns_from_previous > actions->getRequiredColumnsWithTypes().size()) - { - actions->prependProjectInput(); - } -} - -void checkSchemaContainsParentRequire(const NamesAndTypes & schema, const Names & parent_require) -{ - NameSet schema_set; - for (const auto & column : schema) - schema_set.insert(column.name); - for (const auto & parent_require_column : parent_require) - { - if (unlikely(schema_set.find(parent_require_column) == schema_set.end())) - throw TiFlashException( - fmt::format("schema don't contain parent require column: {}", parent_require_column), - Errors::Coprocessor::Internal); - } -} - -void checkParentRequireContainsSchema(const Names & parent_require, const NamesAndTypes & schema) -{ - NameSet parent_require_set; - for (const auto & parent_require_column : parent_require) - parent_require_set.insert(parent_require_column); - for (const auto & schema_column : schema) - { - if (unlikely(parent_require_set.find(schema_column.name) == parent_require_set.end())) - throw TiFlashException( - fmt::format("parent require don't contain schema column: {}", schema_column.name), - Errors::Coprocessor::Internal); - } -} - -void checkSampleBlockContainsSchema(const Block & sample_block, const NamesAndTypes & schema) -{ - for (const auto & schema_column : schema) - { - if (unlikely(!sample_block.has(schema_column.name))) - throw TiFlashException( - fmt::format("sample block don't contain schema column: {}", schema_column.name), - Errors::Coprocessor::Internal); - - const auto & type_in_sample_block = sample_block.getByName(schema_column.name).type->getName(); - const auto & type_in_schema = schema_column.type->getName(); - if (unlikely(type_in_sample_block != type_in_schema)) - throw TiFlashException( - fmt::format( - "the type of column `{}` in sample block `{}` is difference from the one in schema `{}`", - schema_column.name, - type_in_sample_block, - type_in_schema), - Errors::Coprocessor::Internal); - } -} - -void checkSchemaContainsSampleBlock(const NamesAndTypes & schema, const Block & sample_block) -{ - std::unordered_map schema_map; - for (const auto & column : schema) - schema_map[column.name] = column.type; - for (const auto & sample_block_column : sample_block) - { - auto it = schema_map.find(sample_block_column.name); - if (unlikely(it == schema_map.end())) - throw TiFlashException( - fmt::format("schema don't contain sample block column: {}", sample_block_column.name), - Errors::Coprocessor::Internal); - - const auto & type_in_schema = it->second->getName(); - const auto & type_in_sample_block = sample_block_column.type->getName(); - if (unlikely(type_in_sample_block != type_in_schema)) - throw TiFlashException( - fmt::format( - "the type of column `{}` in schema `{}` is difference from the one in sample block `{}`", - sample_block_column.name, - type_in_schema, - type_in_sample_block), - Errors::Coprocessor::Internal); - } -} -} // namespace DB::FinalizeHelper diff --git a/dbms/src/Flash/Planner/FinalizeHelper.h b/dbms/src/Flash/Planner/FinalizeHelper.h deleted file mode 100644 index 26fc0ffdb11..00000000000 --- a/dbms/src/Flash/Planner/FinalizeHelper.h +++ /dev/null @@ -1,20 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include - -namespace DB::FinalizeHelper -{ -void prependProjectInputIfNeed(ExpressionActionsPtr & actions, size_t columns_from_previous); - -void checkSchemaContainsParentRequire(const NamesAndTypes & schema, const Names & parent_require); - -void checkParentRequireContainsSchema(const Names & parent_require, const NamesAndTypes & schema); - -void checkSampleBlockContainsSchema(const Block & sample_block, const NamesAndTypes & schema); - -void checkSchemaContainsSampleBlock(const NamesAndTypes & schema, const Block & sample_block); -} // namespace DB::FinalizeHelper diff --git a/dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp b/dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp new file mode 100644 index 00000000000..d8ef2e7f0c5 --- /dev/null +++ b/dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp @@ -0,0 +1,32 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +namespace +{ +PhysicalPlanPtr popBack(std::vector vec) +{ + assert(!vec.empty()); + PhysicalPlanPtr back = vec.back(); + vec.pop_back(); + return back; +} +} // namespace + +void PhysicalPlanBuilder::buildSource(const Block & sample_block) +{ + cur_plans.push_back(PhysicalSource::build(source_sample_block, log.identifier())); +} +} // namespace DB diff --git a/dbms/src/Flash/Planner/PhysicalPlanBuilder.h b/dbms/src/Flash/Planner/PhysicalPlanBuilder.h new file mode 100644 index 00000000000..0731f3de887 --- /dev/null +++ b/dbms/src/Flash/Planner/PhysicalPlanBuilder.h @@ -0,0 +1,47 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +namespace DB +{ +class PhysicalPlanBuilder +{ +public: + explicit PhysicalPlanBuilder(Context & context_, const String & req_id) + : context(context_) + , log("PhysicalPlanBuilder", req_id) + {} + + void buildSource(const Block & sample_block); + + PhysicalPlanPtr getResult() const + { + RUNTIME_ASSERT(cur_plans.size() == 1, log, "There can only be one plan output, but here are {}", cur_plans.size()); + return cur_plans.back(); + } + +private: + std::vector cur_plans; + + Context & context; + + Logger log; +}; +} // namespace DB diff --git a/dbms/src/Flash/Planner/PlanType.cpp b/dbms/src/Flash/Planner/PlanType.cpp index 0b733fd74e2..b766a890455 100644 --- a/dbms/src/Flash/Planner/PlanType.cpp +++ b/dbms/src/Flash/Planner/PlanType.cpp @@ -21,20 +21,8 @@ String toString(const PlanType & plan_type) { switch (plan_type) { - case Aggregation: - return "Aggregation"; - case ExchangeSender: - return "ExchangeSender"; - case Limit: - return "Limit"; - case Projection: - return "Projection"; - case Selection: - return "Selection"; case Source: return "Source"; - case TopN: - return "TopN"; default: throw Exception("Unknown PlanType"); } diff --git a/dbms/src/Flash/Planner/PlanType.h b/dbms/src/Flash/Planner/PlanType.h index e9ab9cb6f67..47bd1d8316d 100644 --- a/dbms/src/Flash/Planner/PlanType.h +++ b/dbms/src/Flash/Planner/PlanType.h @@ -20,13 +20,7 @@ namespace DB { enum PlanType { - Aggregation, - ExchangeSender, - Limit, - Projection, - Selection, Source, - TopN, }; String toString(const PlanType & plan_type); diff --git a/dbms/src/Flash/Planner/Planner.cpp b/dbms/src/Flash/Planner/Planner.cpp index cd597f33998..ef0e3f4aeeb 100644 --- a/dbms/src/Flash/Planner/Planner.cpp +++ b/dbms/src/Flash/Planner/Planner.cpp @@ -14,8 +14,8 @@ #include #include -#include #include +#include #include #include @@ -45,7 +45,7 @@ BlockInputStreams Planner::execute() return pipeline.streams; } -bool Planner::isSupported(const DAGQueryBlock & query_block) +bool Planner::isSupported(const DAGQueryBlock &) { return false; } @@ -63,5 +63,15 @@ void Planner::restorePipelineConcurrency(DAGPipeline & pipeline) void Planner::executeImpl(DAGPipeline & pipeline) { + PhysicalPlanBuilder builder{context, log->identifier()}; + for (const auto & input_streams : input_streams_vec) + { + assert(!input_streams.empty()); + builder.buildSource(input_streams.back()->getHeader()); + } + + auto physical_plan = builder.getResult(); + physical_plan->finalize(); + physical_plan->transform(pipeline, context, max_streams); } } // namespace DB \ No newline at end of file diff --git a/dbms/src/Flash/Planner/plans/PhysicalLeaf.h b/dbms/src/Flash/Planner/plans/PhysicalLeaf.h new file mode 100644 index 00000000000..038c0316f20 --- /dev/null +++ b/dbms/src/Flash/Planner/plans/PhysicalLeaf.h @@ -0,0 +1,49 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +namespace DB +{ +/** + * A physical plan node with no children. + */ +class PhysicalLeaf : public PhysicalPlan +{ +public: + PhysicalLeaf(const String & executor_id_, const PlanType & type_, const NamesAndTypes & schema_, const String & req_id) + : PhysicalPlan(executor_id_, type_, schema_, req_id) + {} + + PhysicalPlanPtr children(size_t) const override + { + throw Exception("the children size of PhysicalLeaf is zero"); + } + + void setChild(size_t, const PhysicalPlanPtr &) override + { + throw Exception("the children size of PhysicalLeaf is zero"); + } + + void appendChild(const PhysicalPlanPtr &) override + { + throw Exception("the children size of PhysicalLeaf is zero"); + } + + size_t childrenSize() const override { return 0; }; +}; +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Flash/Planner/plans/PhysicalSource.h b/dbms/src/Flash/Planner/plans/PhysicalSource.h new file mode 100644 index 00000000000..6b6837de107 --- /dev/null +++ b/dbms/src/Flash/Planner/plans/PhysicalSource.h @@ -0,0 +1,55 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +namespace DB +{ +class PhysicalSource : public PhysicalLeaf +{ +public: + static PhysicalPlanPtr build( + const Block & sample_block, + const String & req_id) + { + NamesAndTypes schema; + for (const auto & col : sample_block) + schema.emplace_back(col.name, col.type); + return std::make_shared("source", schema, sample_block, req_id); + } + + PhysicalSource( + const String & executor_id_, + const NamesAndTypes & schema_, + const Block & sample_block_, + const String & req_id) + : PhysicalLeaf(executor_id_, PlanType::Source, schema_, req_id) + , sample_block(sample_block_) + { + is_record_profile_streams = false; + } + + void transformImpl(DAGPipeline &, Context &, size_t) override {} + + void finalize(const Names &) override {} + + const Block & getSampleBlock() const override { return sample_block; } + +private: + Block sample_block; +}; +} // namespace DB diff --git a/dbms/src/Flash/Planner/plans/PhysicalUnary.h b/dbms/src/Flash/Planner/plans/PhysicalUnary.h index 2b978b0f743..a5fab90ddbc 100644 --- a/dbms/src/Flash/Planner/plans/PhysicalUnary.h +++ b/dbms/src/Flash/Planner/plans/PhysicalUnary.h @@ -1,3 +1,17 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + #pragma once #include From 99c6a74b8a8688fa4ea6be593739cc850a690ca0 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Mon, 25 Apr 2022 22:59:56 +0800 Subject: [PATCH 03/20] fix --- dbms/src/Flash/Planner/PhysicalPlan.h | 4 ++-- dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp | 11 ----------- dbms/src/Flash/Planner/PhysicalPlanBuilder.h | 4 ++-- 3 files changed, 4 insertions(+), 15 deletions(-) diff --git a/dbms/src/Flash/Planner/PhysicalPlan.h b/dbms/src/Flash/Planner/PhysicalPlan.h index 49959cc10ad..6e07f7aeefa 100644 --- a/dbms/src/Flash/Planner/PhysicalPlan.h +++ b/dbms/src/Flash/Planner/PhysicalPlan.h @@ -38,7 +38,7 @@ class PhysicalPlan : executor_id(executor_id_) , type(type_) , schema(schema_) - , log(DB::toString(type_), req_id) + , log(Logger::get(DB::toString(type_), req_id)) {} virtual ~PhysicalPlan() = default; @@ -79,6 +79,6 @@ class PhysicalPlan NamesAndTypes schema; bool is_record_profile_streams = true; - Logger log; + LoggerPtr log; }; } // namespace DB diff --git a/dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp b/dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp index d8ef2e7f0c5..a2cd2ee9839 100644 --- a/dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp +++ b/dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp @@ -14,17 +14,6 @@ namespace DB { -namespace -{ -PhysicalPlanPtr popBack(std::vector vec) -{ - assert(!vec.empty()); - PhysicalPlanPtr back = vec.back(); - vec.pop_back(); - return back; -} -} // namespace - void PhysicalPlanBuilder::buildSource(const Block & sample_block) { cur_plans.push_back(PhysicalSource::build(source_sample_block, log.identifier())); diff --git a/dbms/src/Flash/Planner/PhysicalPlanBuilder.h b/dbms/src/Flash/Planner/PhysicalPlanBuilder.h index 0731f3de887..79ba07a5ab6 100644 --- a/dbms/src/Flash/Planner/PhysicalPlanBuilder.h +++ b/dbms/src/Flash/Planner/PhysicalPlanBuilder.h @@ -26,7 +26,7 @@ class PhysicalPlanBuilder public: explicit PhysicalPlanBuilder(Context & context_, const String & req_id) : context(context_) - , log("PhysicalPlanBuilder", req_id) + , log(Logger::get("PhysicalPlanBuilder", req_id)) {} void buildSource(const Block & sample_block); @@ -42,6 +42,6 @@ class PhysicalPlanBuilder Context & context; - Logger log; + LoggerPtr log; }; } // namespace DB From c6363c8b81581644b960b3116b33754fcb0cfdf2 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Mon, 25 Apr 2022 23:34:46 +0800 Subject: [PATCH 04/20] fix --- dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp b/dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp index a2cd2ee9839..3cbeec61e16 100644 --- a/dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp +++ b/dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp @@ -1,16 +1,5 @@ -#include -#include #include -#include -#include -#include -#include -#include -#include -#include #include -#include -#include namespace DB { From bb44d1f5af7b865a0f1ae073f6ff27ec1b999676 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Mon, 25 Apr 2022 23:38:02 +0800 Subject: [PATCH 05/20] fix --- dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp | 2 +- dbms/src/Flash/Planner/PhysicalPlanBuilder.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp b/dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp index 3cbeec61e16..520381216ce 100644 --- a/dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp +++ b/dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp @@ -5,6 +5,6 @@ namespace DB { void PhysicalPlanBuilder::buildSource(const Block & sample_block) { - cur_plans.push_back(PhysicalSource::build(source_sample_block, log.identifier())); + cur_plans.push_back(PhysicalSource::build(sample_block, log->identifier())); } } // namespace DB diff --git a/dbms/src/Flash/Planner/PhysicalPlanBuilder.h b/dbms/src/Flash/Planner/PhysicalPlanBuilder.h index 79ba07a5ab6..bc97d84f5b3 100644 --- a/dbms/src/Flash/Planner/PhysicalPlanBuilder.h +++ b/dbms/src/Flash/Planner/PhysicalPlanBuilder.h @@ -40,7 +40,7 @@ class PhysicalPlanBuilder private: std::vector cur_plans; - Context & context; + [[maybe_unused]] Context & context; LoggerPtr log; }; From c51c1525263d5d23de4906addc24db4a69390ffe Mon Sep 17 00:00:00 2001 From: SeaRise Date: Tue, 10 May 2022 15:32:38 +0800 Subject: [PATCH 06/20] add optimize --- dbms/src/Flash/Planner/Planner.cpp | 3 ++- dbms/src/Flash/Planner/optimize.cpp | 38 +++++++++++++++++++++++++++++ dbms/src/Flash/Planner/optimize.h | 23 +++++++++++++++++ 3 files changed, 63 insertions(+), 1 deletion(-) create mode 100644 dbms/src/Flash/Planner/optimize.cpp create mode 100644 dbms/src/Flash/Planner/optimize.h diff --git a/dbms/src/Flash/Planner/Planner.cpp b/dbms/src/Flash/Planner/Planner.cpp index ef0e3f4aeeb..8b695a3aa92 100644 --- a/dbms/src/Flash/Planner/Planner.cpp +++ b/dbms/src/Flash/Planner/Planner.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include namespace DB @@ -71,7 +72,7 @@ void Planner::executeImpl(DAGPipeline & pipeline) } auto physical_plan = builder.getResult(); - physical_plan->finalize(); + optimize(context, physical_plan); physical_plan->transform(pipeline, context, max_streams); } } // namespace DB \ No newline at end of file diff --git a/dbms/src/Flash/Planner/optimize.cpp b/dbms/src/Flash/Planner/optimize.cpp new file mode 100644 index 00000000000..46c527f92f3 --- /dev/null +++ b/dbms/src/Flash/Planner/optimize.cpp @@ -0,0 +1,38 @@ +// +// Created by root on 5/10/22. +// + +#include +#include + +namespace DB +{ +class Rule +{ +public: + virtual PhysicalPlanPtr apply(const Context & context, PhysicalPlanPtr plan) = 0; + + virtual ~Rule() = default; +}; +using RulePtr = std::shared_ptr; + +class FinalizeRule : public Rule +{ +public: + PhysicalPlanPtr apply(const Context &, PhysicalPlanPtr plan) override + { + plan->finalize(); + return plan; + } + + static RulePtr create() { return std::make_shared(); } +}; + +void optimize(const Context & context, PhysicalPlanPtr plan) +{ + assert(plan); + static std::vector rules{FinalizeRule::create()}; + for (const auto & rule : rules) + plan = rule->apply(context, plan); +} +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Flash/Planner/optimize.h b/dbms/src/Flash/Planner/optimize.h new file mode 100644 index 00000000000..03da563d1c0 --- /dev/null +++ b/dbms/src/Flash/Planner/optimize.h @@ -0,0 +1,23 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB +{ +class Context; +void optimize(const Context & context, PhysicalPlanPtr plan); +} // namespace DB From a5647fddcda8868ed0e61f51da11c035fa8a9879 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Tue, 10 May 2022 15:55:10 +0800 Subject: [PATCH 07/20] format --- dbms/src/Flash/Planner/optimize.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Flash/Planner/optimize.cpp b/dbms/src/Flash/Planner/optimize.cpp index 46c527f92f3..e1f2c127c0d 100644 --- a/dbms/src/Flash/Planner/optimize.cpp +++ b/dbms/src/Flash/Planner/optimize.cpp @@ -35,4 +35,4 @@ void optimize(const Context & context, PhysicalPlanPtr plan) for (const auto & rule : rules) plan = rule->apply(context, plan); } -} // namespace DB \ No newline at end of file +} // namespace DB From 39cc3ecd4086875f4acd685d47b188cf728e6eb2 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Tue, 10 May 2022 15:58:16 +0800 Subject: [PATCH 08/20] license --- dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp | 14 ++++++++++++++ dbms/src/Flash/Planner/PhysicalPlanHelper.cpp | 14 ++++++++++++++ dbms/src/Flash/Planner/PhysicalPlanHelper.h | 14 ++++++++++++++ dbms/src/Flash/Planner/optimize.cpp | 12 +++++++++++- 4 files changed, 53 insertions(+), 1 deletion(-) diff --git a/dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp b/dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp index 520381216ce..b4037746ae5 100644 --- a/dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp +++ b/dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp @@ -1,3 +1,17 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + #include #include diff --git a/dbms/src/Flash/Planner/PhysicalPlanHelper.cpp b/dbms/src/Flash/Planner/PhysicalPlanHelper.cpp index 9d1fc4fee2c..456ea70101e 100644 --- a/dbms/src/Flash/Planner/PhysicalPlanHelper.cpp +++ b/dbms/src/Flash/Planner/PhysicalPlanHelper.cpp @@ -1,3 +1,17 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + #include namespace DB::PhysicalPlanHelper diff --git a/dbms/src/Flash/Planner/PhysicalPlanHelper.h b/dbms/src/Flash/Planner/PhysicalPlanHelper.h index 22ccf876eac..8a39921ec51 100644 --- a/dbms/src/Flash/Planner/PhysicalPlanHelper.h +++ b/dbms/src/Flash/Planner/PhysicalPlanHelper.h @@ -1,3 +1,17 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + #pragma once #include diff --git a/dbms/src/Flash/Planner/optimize.cpp b/dbms/src/Flash/Planner/optimize.cpp index e1f2c127c0d..025c25fd9f1 100644 --- a/dbms/src/Flash/Planner/optimize.cpp +++ b/dbms/src/Flash/Planner/optimize.cpp @@ -1,6 +1,16 @@ +// Copyright 2022 PingCAP, Ltd. // -// Created by root on 5/10/22. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at // +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. #include #include From 3960244e87d7bad422637f18761c74c28ae37706 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Tue, 10 May 2022 16:18:08 +0800 Subject: [PATCH 09/20] plan type --- dbms/src/Flash/Planner/PhysicalPlan.cpp | 13 +++++++++++- dbms/src/Flash/Planner/PhysicalPlan.h | 11 +++++----- dbms/src/Flash/Planner/PlanType.cpp | 4 ++-- dbms/src/Flash/Planner/PlanType.h | 27 +++++++++++++++++++++---- 4 files changed, 42 insertions(+), 13 deletions(-) diff --git a/dbms/src/Flash/Planner/PhysicalPlan.cpp b/dbms/src/Flash/Planner/PhysicalPlan.cpp index 1d4f95f3716..73fcb839ae1 100644 --- a/dbms/src/Flash/Planner/PhysicalPlan.cpp +++ b/dbms/src/Flash/Planner/PhysicalPlan.cpp @@ -21,6 +21,17 @@ namespace DB { +PhysicalPlan::PhysicalPlan( + const String & executor_id_, + const PlanType & type_, + const NamesAndTypes & schema_, + const String & req_id) + : executor_id(executor_id_) + , type(type_) + , schema(schema_) + , log(Logger::get(type_.toString(), req_id)) +{} + String PhysicalPlan::toString() { auto schema_to_string = [&]() { @@ -34,7 +45,7 @@ String PhysicalPlan::toString() }; return fmt::format( "type: {}, executor_id: {}, is_record_profile_streams: {}, schema: {}", - DB::toString(type), + type.toString(), executor_id, is_record_profile_streams, schema_to_string()); diff --git a/dbms/src/Flash/Planner/PhysicalPlan.h b/dbms/src/Flash/Planner/PhysicalPlan.h index 6e07f7aeefa..8a69545f10b 100644 --- a/dbms/src/Flash/Planner/PhysicalPlan.h +++ b/dbms/src/Flash/Planner/PhysicalPlan.h @@ -34,12 +34,11 @@ using PhysicalPlanPtr = std::shared_ptr; class PhysicalPlan { public: - PhysicalPlan(const String & executor_id_, const PlanType & type_, const NamesAndTypes & schema_, const String & req_id) - : executor_id(executor_id_) - , type(type_) - , schema(schema_) - , log(Logger::get(DB::toString(type_), req_id)) - {} + PhysicalPlan( + const String & executor_id_, + const PlanType & type_, + const NamesAndTypes & schema_, + const String & req_id); virtual ~PhysicalPlan() = default; diff --git a/dbms/src/Flash/Planner/PlanType.cpp b/dbms/src/Flash/Planner/PlanType.cpp index b766a890455..f4b685fbd93 100644 --- a/dbms/src/Flash/Planner/PlanType.cpp +++ b/dbms/src/Flash/Planner/PlanType.cpp @@ -17,9 +17,9 @@ namespace DB { -String toString(const PlanType & plan_type) +String PlanType::toString() const { - switch (plan_type) + switch (enum_value) { case Source: return "Source"; diff --git a/dbms/src/Flash/Planner/PlanType.h b/dbms/src/Flash/Planner/PlanType.h index 47bd1d8316d..9e80b152652 100644 --- a/dbms/src/Flash/Planner/PlanType.h +++ b/dbms/src/Flash/Planner/PlanType.h @@ -18,10 +18,29 @@ namespace DB { -enum PlanType +struct PlanType { - Source, -}; + enum PlanTypeEnum + { + Source = 0x1, + }; + PlanTypeEnum enum_value; + + PlanType(int value = 0) // NOLINT(google-explicit-constructor) + : enum_value(static_cast(value)) + {} + + PlanType & operator=(int value) + { + this->enum_value = static_cast(value); + return *this; + } -String toString(const PlanType & plan_type); + operator int() const // NOLINT(google-explicit-constructor) + { + return this->enum_value; + } + + String toString() const; +}; } // namespace DB From 9b86c4194067f685ae58eae8a6ed8c5d20ba5b28 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Tue, 10 May 2022 17:39:47 +0800 Subject: [PATCH 10/20] u --- dbms/src/Flash/Planner/Planner.cpp | 2 +- dbms/src/Flash/Planner/Planner.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Flash/Planner/Planner.cpp b/dbms/src/Flash/Planner/Planner.cpp index 8b695a3aa92..5e234dd3365 100644 --- a/dbms/src/Flash/Planner/Planner.cpp +++ b/dbms/src/Flash/Planner/Planner.cpp @@ -75,4 +75,4 @@ void Planner::executeImpl(DAGPipeline & pipeline) optimize(context, physical_plan); physical_plan->transform(pipeline, context, max_streams); } -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/Flash/Planner/Planner.h b/dbms/src/Flash/Planner/Planner.h index 7abe258a1aa..36ba918d780 100644 --- a/dbms/src/Flash/Planner/Planner.h +++ b/dbms/src/Flash/Planner/Planner.h @@ -57,4 +57,4 @@ class Planner LoggerPtr log; }; -} // namespace DB \ No newline at end of file +} // namespace DB From 4bde83b015f9e209c39260d4c803afb249a8a2fc Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 25 May 2022 14:40:08 +0800 Subject: [PATCH 11/20] license check --- .github/workflows/license-checker.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/license-checker.yml b/.github/workflows/license-checker.yml index e156c1b2b4c..c4c510677b1 100644 --- a/.github/workflows/license-checker.yml +++ b/.github/workflows/license-checker.yml @@ -3,10 +3,10 @@ name: License checker on: push: branches: - - master + - planner_refactory pull_request: branches: - - master + - planner_refactory jobs: check-license: From 2fc02b7ba3b9f8806d52661f9fdef478eb0ef880 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Thu, 26 May 2022 15:29:32 +0800 Subject: [PATCH 12/20] add tests --- dbms/src/Flash/tests/gtest_interpreter.cpp | 2 + dbms/src/Flash/tests/gtest_planner.cpp | 388 +++++++++++++++++++++ dbms/src/Interpreters/Settings.h | 2 +- 3 files changed, 391 insertions(+), 1 deletion(-) create mode 100644 dbms/src/Flash/tests/gtest_planner.cpp diff --git a/dbms/src/Flash/tests/gtest_interpreter.cpp b/dbms/src/Flash/tests/gtest_interpreter.cpp index aed9d9e90f9..f21f0701e90 100644 --- a/dbms/src/Flash/tests/gtest_interpreter.cpp +++ b/dbms/src/Flash/tests/gtest_interpreter.cpp @@ -26,6 +26,8 @@ class InterpreterExecuteTest : public DB::tests::InterpreterTest { InterpreterTest::initializeContext(); + context.context->setSetting("enable_planner", "false"); + context.addMockTable({"test_db", "test_table"}, {{"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}}); context.addMockTable({"test_db", "test_table_1"}, {{"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}, {"s3", TiDB::TP::TypeString}}); context.addMockTable({"test_db", "r_table"}, {{"r_a", TiDB::TP::TypeLong}, {"r_b", TiDB::TP::TypeString}, {"join_c", TiDB::TP::TypeString}}); diff --git a/dbms/src/Flash/tests/gtest_planner.cpp b/dbms/src/Flash/tests/gtest_planner.cpp new file mode 100644 index 00000000000..c2742080a59 --- /dev/null +++ b/dbms/src/Flash/tests/gtest_planner.cpp @@ -0,0 +1,388 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +namespace DB::tests +{ +class PlannerTest : public DB::tests::InterpreterTest +{ +public: + void initializeContext() override + { + InterpreterTest::initializeContext(); + + context.context->setSetting("enable_planner", "true"); + + context.addMockTable({"test_db", "test_table"}, {{"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}}); + context.addMockTable({"test_db", "test_table_1"}, {{"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}, {"s3", TiDB::TP::TypeString}}); + context.addMockTable({"test_db", "r_table"}, {{"r_a", TiDB::TP::TypeLong}, {"r_b", TiDB::TP::TypeString}, {"join_c", TiDB::TP::TypeString}}); + context.addMockTable({"test_db", "l_table"}, {{"l_a", TiDB::TP::TypeLong}, {"l_b", TiDB::TP::TypeString}, {"join_c", TiDB::TP::TypeString}}); + context.addExchangeRelationSchema("sender_1", {{"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}, {"s3", TiDB::TP::TypeString}}); + context.addExchangeRelationSchema("sender_l", {{"l_a", TiDB::TP::TypeString}, {"l_b", TiDB::TP::TypeString}, {"join_c", TiDB::TP::TypeString}}); + context.addExchangeRelationSchema("sender_r", {{"r_a", TiDB::TP::TypeString}, {"r_b", TiDB::TP::TypeString}, {"join_c", TiDB::TP::TypeString}}); + } +}; + +TEST_F(PlannerTest, SingleQueryBlock) +try +{ + auto request = context.scan("test_db", "test_table_1") + .filter(eq(col("s2"), col("s3"))) + .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) + .filter(eq(col("s2"), col("s3"))) + .topN("s2", false, 10) + .build(context); + { + String expected = R"( +Union: + SharedQuery x 10: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 10: limit = 10 + Expression: + Filter: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + Filter: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } + + request = context.scan("test_db", "test_table_1") + .filter(eq(col("s2"), col("s3"))) + .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) + .filter(eq(col("s2"), col("s3"))) + .limit(10) + .build(context); + + { + String expected = R"( +Union: + SharedQuery x 10: + Limit, limit = 10 + Union: + Limit x 10, limit = 10 + Expression: + Expression: + Filter: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + Filter: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } +} +CATCH + +TEST_F(PlannerTest, MultipleQueryBlockWithSource) +try +{ + auto request = context.scan("test_db", "test_table_1") + .project({"s1", "s2", "s3"}) + .project({"s1", "s2"}) + .project("s1") + .build(context); + { + String expected = R"( +Union: + Expression x 10: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } + + request = context.scan("test_db", "test_table_1") + .project({"s1", "s2", "s3"}) + .topN({{"s1", true}, {"s2", false}}, 10) + .project({"s1", "s2"}) + .build(context); + { + String expected = R"( +Union: + Expression x 10: + Expression: + Expression: + SharedQuery: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 10: limit = 10 + Expression: + Expression: + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } + + request = context.scan("test_db", "test_table_1") + .project({"s1", "s2", "s3"}) + .topN({{"s1", true}, {"s2", false}}, 10) + .project({"s1", "s2"}) + .aggregation({Max(col("s1"))}, {col("s1"), col("s2")}) + .project({"max(s1)", "s1", "s2"}) + .build(context); + { + String expected = R"( +Union: + Expression x 10: + Expression: + Expression: + Expression: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + Expression: + Expression: + SharedQuery: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 10: limit = 10 + Expression: + Expression: + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } + + request = context.scan("test_db", "test_table_1") + .project({"s1", "s2", "s3"}) + .topN({{"s1", true}, {"s2", false}}, 10) + .project({"s1", "s2"}) + .aggregation({Max(col("s1"))}, {col("s1"), col("s2")}) + .project({"max(s1)", "s1", "s2"}) + .filter(eq(col("s1"), col("s2"))) + .project({"max(s1)", "s1"}) + .limit(10) + .build(context); + { + String expected = R"( +Union: + SharedQuery x 10: + Limit, limit = 10 + Union: + Limit x 10, limit = 10 + Expression: + Expression: + Expression: + Expression: + Expression: + Filter: + Expression: + Expression: + Expression: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + Expression: + Expression: + SharedQuery: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 10: limit = 10 + Expression: + Expression: + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } + + // Join Source. + DAGRequestBuilder table1 = context.scan("test_db", "r_table"); + DAGRequestBuilder table2 = context.scan("test_db", "l_table"); + DAGRequestBuilder table3 = context.scan("test_db", "r_table"); + DAGRequestBuilder table4 = context.scan("test_db", "l_table"); + + request = table1.join( + table2.join( + table3.join(table4, + {col("join_c")}, + ASTTableJoin::Kind::Left), + {col("join_c")}, + ASTTableJoin::Kind::Left), + {col("join_c")}, + ASTTableJoin::Kind::Left) + .build(context); + { + String expected = R"( +CreatingSets + Union: + HashJoinBuildBlockInputStream x 10: , join_kind = Left + Expression: + Expression: + MockTableScan + Union x 2: + HashJoinBuildBlockInputStream x 10: , join_kind = Left + Expression: + Expression: + Expression: + HashJoinProbe: + Expression: + MockTableScan + Union: + Expression x 10: + Expression: + HashJoinProbe: + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } + + request = context.receive("sender_1") + .project({"s1", "s2", "s3"}) + .project({"s1", "s2"}) + .project("s1") + .build(context); + { + String expected = R"( +Union: + Expression x 10: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + MockExchangeReceiver)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } + + request = context.receive("sender_1") + .project({"s1", "s2", "s3"}) + .project({"s1", "s2"}) + .project("s1") + .exchangeSender(tipb::Broadcast) + .build(context); + { + String expected = R"( +Union: + MockExchangeSender x 10 + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + Expression: + MockExchangeReceiver)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } + + // only join + ExchangeReceiver + DAGRequestBuilder receiver1 = context.receive("sender_l"); + DAGRequestBuilder receiver2 = context.receive("sender_r"); + DAGRequestBuilder receiver3 = context.receive("sender_l"); + DAGRequestBuilder receiver4 = context.receive("sender_r"); + + request = receiver1.join( + receiver2.join( + receiver3.join(receiver4, + {col("join_c")}, + ASTTableJoin::Kind::Left), + {col("join_c")}, + ASTTableJoin::Kind::Left), + {col("join_c")}, + ASTTableJoin::Kind::Left) + .build(context); + { + String expected = R"( +CreatingSets + Union: + HashJoinBuildBlockInputStream x 10: , join_kind = Left + Expression: + Expression: + MockExchangeReceiver + Union x 2: + HashJoinBuildBlockInputStream x 10: , join_kind = Left + Expression: + Expression: + Expression: + HashJoinProbe: + Expression: + MockExchangeReceiver + Union: + Expression x 10: + Expression: + HashJoinProbe: + Expression: + MockExchangeReceiver)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } + + // join + receiver + sender + // TODO: Find a way to write the request easier. + DAGRequestBuilder receiver5 = context.receive("sender_l"); + DAGRequestBuilder receiver6 = context.receive("sender_r"); + DAGRequestBuilder receiver7 = context.receive("sender_l"); + DAGRequestBuilder receiver8 = context.receive("sender_r"); + request = receiver5.join( + receiver6.join( + receiver7.join(receiver8, + {col("join_c")}, + ASTTableJoin::Kind::Left), + {col("join_c")}, + ASTTableJoin::Kind::Left), + {col("join_c")}, + ASTTableJoin::Kind::Left) + .exchangeSender(tipb::PassThrough) + .build(context); + { + String expected = R"( +CreatingSets + Union: + HashJoinBuildBlockInputStream x 10: , join_kind = Left + Expression: + Expression: + MockExchangeReceiver + Union x 2: + HashJoinBuildBlockInputStream x 10: , join_kind = Left + Expression: + Expression: + Expression: + HashJoinProbe: + Expression: + MockExchangeReceiver + Union: + MockExchangeSender x 10 + Expression: + Expression: + HashJoinProbe: + Expression: + MockExchangeReceiver)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } +} +CATCH + +} \ No newline at end of file diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 44786ded44e..b87b8ccc3d8 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -364,7 +364,7 @@ struct Settings M(SettingUInt64, manual_compact_pool_size, 1, "The number of worker threads to handle manual compact requests.") \ M(SettingUInt64, manual_compact_max_concurrency, 10, "Max concurrent tasks. It should be larger than pool size.") \ M(SettingUInt64, manual_compact_more_until_ms, 60000, "Continuously compact more segments until reaching specified elapsed time. If 0 is specified, only one segment will be compacted each round.") \ - M(SettingBool, enable_planner, true, "Enable planner") + M(SettingBool, enable_planner, true, "Enable planner") // clang-format on #define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) TYPE NAME{DEFAULT}; From 6a91f869c5b75e80fd8f66603fe7f5a0132a52d6 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Thu, 26 May 2022 15:46:08 +0800 Subject: [PATCH 13/20] f --- dbms/src/Flash/tests/gtest_interpreter.cpp | 2 +- dbms/src/Flash/tests/gtest_planner.cpp | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dbms/src/Flash/tests/gtest_interpreter.cpp b/dbms/src/Flash/tests/gtest_interpreter.cpp index f21f0701e90..7ddbc975515 100644 --- a/dbms/src/Flash/tests/gtest_interpreter.cpp +++ b/dbms/src/Flash/tests/gtest_interpreter.cpp @@ -26,7 +26,7 @@ class InterpreterExecuteTest : public DB::tests::InterpreterTest { InterpreterTest::initializeContext(); - context.context->setSetting("enable_planner", "false"); + context.context.setSetting("enable_planner", "false"); context.addMockTable({"test_db", "test_table"}, {{"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}}); context.addMockTable({"test_db", "test_table_1"}, {{"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}, {"s3", TiDB::TP::TypeString}}); diff --git a/dbms/src/Flash/tests/gtest_planner.cpp b/dbms/src/Flash/tests/gtest_planner.cpp index c2742080a59..754df6993d0 100644 --- a/dbms/src/Flash/tests/gtest_planner.cpp +++ b/dbms/src/Flash/tests/gtest_planner.cpp @@ -24,7 +24,7 @@ class PlannerTest : public DB::tests::InterpreterTest { InterpreterTest::initializeContext(); - context.context->setSetting("enable_planner", "true"); + context.context.setSetting("enable_planner", "true"); context.addMockTable({"test_db", "test_table"}, {{"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}}); context.addMockTable({"test_db", "test_table_1"}, {{"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}, {"s3", TiDB::TP::TypeString}}); @@ -385,4 +385,4 @@ CreatingSets } CATCH -} \ No newline at end of file +} // namespace DB::tests \ No newline at end of file From dcaa84abf53749f2a614a34c985b9ef2f6be4fc2 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Tue, 31 May 2022 11:38:02 +0800 Subject: [PATCH 14/20] Errors::Planner --- dbms/src/Common/TiFlashException.h | 17 +++++++++++++++++ dbms/src/Flash/Planner/PlanType.cpp | 4 ++-- dbms/src/Flash/Planner/plans/PhysicalLeaf.h | 8 ++++---- 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/dbms/src/Common/TiFlashException.h b/dbms/src/Common/TiFlashException.h index 2026571859e..d84615de932 100644 --- a/dbms/src/Common/TiFlashException.h +++ b/dbms/src/Common/TiFlashException.h @@ -96,6 +96,23 @@ namespace DB "This error usually occurs when the TiFlash server is busy or the TiFlash node is down.\n", \ ""); \ ) \ + C(Planner, \ + E(BadRequest, "Bad TiDB DAGRequest.", \ + "This error is usually caused by incorrect TiDB DAGRequest. \n" \ + "Please contact with developer, \n" \ + "better providing information about your cluster(log, topology information etc.).", \ + ""); \ + E(Unimplemented, "Some features are unimplemented.", \ + "This error may caused by unmatched TiDB and TiFlash versions, \n" \ + "and should not occur in common case. \n" \ + "Please contact with developer, \n" \ + "better providing information about your cluster(log, topology information etc.).", \ + ""); \ + E(Internal, "TiFlash Planner internal error.", \ + "Please contact with developer, \n" \ + "better providing information about your cluster(log, topology information etc.).", \ + ""); \ + ) \ C(Table, \ E(SchemaVersionError, "Schema version of target table in TiFlash is different from that in query.", \ "TiFlash will sync the newest schema from TiDB before processing every query. \n" \ diff --git a/dbms/src/Flash/Planner/PlanType.cpp b/dbms/src/Flash/Planner/PlanType.cpp index f4b685fbd93..131a9c13b3a 100644 --- a/dbms/src/Flash/Planner/PlanType.cpp +++ b/dbms/src/Flash/Planner/PlanType.cpp @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include +#include #include namespace DB @@ -24,7 +24,7 @@ String PlanType::toString() const case Source: return "Source"; default: - throw Exception("Unknown PlanType"); + throw TiFlashException("Unknown PlanType", Errors::Planner::Internal); } } } // namespace DB diff --git a/dbms/src/Flash/Planner/plans/PhysicalLeaf.h b/dbms/src/Flash/Planner/plans/PhysicalLeaf.h index 038c0316f20..f281ce7f1e0 100644 --- a/dbms/src/Flash/Planner/plans/PhysicalLeaf.h +++ b/dbms/src/Flash/Planner/plans/PhysicalLeaf.h @@ -14,7 +14,7 @@ #pragma once -#include +#include #include namespace DB @@ -31,17 +31,17 @@ class PhysicalLeaf : public PhysicalPlan PhysicalPlanPtr children(size_t) const override { - throw Exception("the children size of PhysicalLeaf is zero"); + throw TiFlashException("the children size of PhysicalLeaf is zero", Errors::Planner::Internal); } void setChild(size_t, const PhysicalPlanPtr &) override { - throw Exception("the children size of PhysicalLeaf is zero"); + throw TiFlashException("the children size of PhysicalLeaf is zero", Errors::Planner::Internal); } void appendChild(const PhysicalPlanPtr &) override { - throw Exception("the children size of PhysicalLeaf is zero"); + throw TiFlashException("the children size of PhysicalLeaf is zero", Errors::Planner::Internal); } size_t childrenSize() const override { return 0; }; From 64b6d2c0264440b65823fdce860156a0c1555a99 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 8 Jun 2022 17:23:54 +0800 Subject: [PATCH 15/20] add tests and test --- ...nner.cpp => gtest_planner_interpreter.cpp} | 184 ++++++++++++++++-- dbms/src/TestUtils/ExecutorTestUtils.cpp | 4 +- 2 files changed, 169 insertions(+), 19 deletions(-) rename dbms/src/Flash/tests/{gtest_planner.cpp => gtest_planner_interpreter.cpp} (75%) diff --git a/dbms/src/Flash/tests/gtest_planner.cpp b/dbms/src/Flash/tests/gtest_planner_interpreter.cpp similarity index 75% rename from dbms/src/Flash/tests/gtest_planner.cpp rename to dbms/src/Flash/tests/gtest_planner_interpreter.cpp index 754df6993d0..a1a39d444ca 100644 --- a/dbms/src/Flash/tests/gtest_planner.cpp +++ b/dbms/src/Flash/tests/gtest_planner_interpreter.cpp @@ -12,17 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include +#include #include -namespace DB::tests +namespace DB { -class PlannerTest : public DB::tests::InterpreterTest +namespace tests +{ +class PlannerInterpreterExecuteTest : public DB::tests::ExecutorTest { public: void initializeContext() override { - InterpreterTest::initializeContext(); + ExecutorTest::initializeContext(); context.context.setSetting("enable_planner", "true"); @@ -36,7 +38,7 @@ class PlannerTest : public DB::tests::InterpreterTest } }; -TEST_F(PlannerTest, SingleQueryBlock) +TEST_F(PlannerInterpreterExecuteTest, SimpleQuery) try { auto request = context.scan("test_db", "test_table_1") @@ -47,7 +49,7 @@ try .build(context); { String expected = R"( -Union: +Union: SharedQuery x 10: Expression: MergeSorting, limit = 10 @@ -72,7 +74,7 @@ Union: { String expected = R"( -Union: +Union: SharedQuery x 10: Limit, limit = 10 Union: @@ -90,7 +92,7 @@ Union: } CATCH -TEST_F(PlannerTest, MultipleQueryBlockWithSource) +TEST_F(PlannerInterpreterExecuteTest, ComplexQuery) try { auto request = context.scan("test_db", "test_table_1") @@ -100,7 +102,7 @@ try .build(context); { String expected = R"( -Union: +Union: Expression x 10: Expression: Expression: @@ -122,7 +124,7 @@ Union: .build(context); { String expected = R"( -Union: +Union: Expression x 10: Expression: Expression: @@ -147,7 +149,7 @@ Union: .build(context); { String expected = R"( -Union: +Union: Expression x 10: Expression: Expression: @@ -181,7 +183,7 @@ Union: .build(context); { String expected = R"( -Union: +Union: SharedQuery x 10: Limit, limit = 10 Union: @@ -244,7 +246,7 @@ CreatingSets HashJoinProbe: Expression: MockTableScan - Union: + Union: Expression x 10: Expression: HashJoinProbe: @@ -260,7 +262,7 @@ CreatingSets .build(context); { String expected = R"( -Union: +Union: Expression x 10: Expression: Expression: @@ -283,7 +285,7 @@ Union: .build(context); { String expected = R"( -Union: +Union: MockExchangeSender x 10 Expression: Expression: @@ -331,7 +333,7 @@ CreatingSets HashJoinProbe: Expression: MockExchangeReceiver - Union: + Union: Expression x 10: Expression: HashJoinProbe: @@ -373,7 +375,7 @@ CreatingSets HashJoinProbe: Expression: MockExchangeReceiver - Union: + Union: MockExchangeSender x 10 Expression: Expression: @@ -385,4 +387,150 @@ CreatingSets } CATCH -} // namespace DB::tests \ No newline at end of file +TEST_F(PlannerInterpreterExecuteTest, ParallelQuery) +try +{ + /// executor with table scan + auto request = context.scan("test_db", "test_table_1") + .limit(10) + .build(context); + { + String expected; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + + expected = R"()"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 5); + } + + request = context.scan("test_db", "test_table_1") + .project({"s1", "s2", "s3"}) + .build(context); + { + String expected; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + + expected = R"()"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 5); + } + + request = context.scan("test_db", "test_table_1") + .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) + .build(context); + { + String expected; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + + expected = R"()"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 5); + } + + request = context.scan("test_db", "test_table_1") + .topN("s2", false, 10) + .build(context); + { + String expected; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + + expected = R"()"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 5); + } + + request = context.scan("test_db", "test_table_1") + .filter(eq(col("s2"), col("s3"))) + .build(context); + { + String expected; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + + expected = R"()"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 5); + } + + /// other cases + request = context.scan("test_db", "test_table_1") + .limit(10) + .project({"s1", "s2", "s3"}) + .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) + .build(context); + { + String expected; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + + expected = R"()"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + } + + request = context.scan("test_db", "test_table_1") + .topN("s2", false, 10) + .project({"s1", "s2", "s3"}) + .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) + .build(context); + { + String expected; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + + expected = R"()"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + } + + request = context.scan("test_db", "test_table_1") + .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) + .project({"s2", "s3"}) + .aggregation({Max(col("s2"))}, {col("s3")}) + .build(context); + { + String expected; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + + expected = R"()"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + } + + request = context.scan("test_db", "test_table_1") + .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) + .exchangeSender(tipb::PassThrough) + .build(context); + { + String expected; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + + expected = R"()"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + } + + request = context.scan("test_db", "test_table_1") + .topN("s2", false, 10) + .exchangeSender(tipb::PassThrough) + .build(context); + { + String expected; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + + expected = R"()"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + } + + request = context.scan("test_db", "test_table_1") + .limit(10) + .exchangeSender(tipb::PassThrough) + .build(context); + { + String expected; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + + expected = R"()"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + } + + DAGRequestBuilder table1 = context.scan("test_db", "r_table"); + DAGRequestBuilder table2 = context.scan("test_db", "l_table"); + request = table1.join(table2.limit(1), {col("join_c")}, ASTTableJoin::Kind::Left).build(context); + { + String expected; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } +} +CATCH + +} // namespace tests +} // namespace DB \ No newline at end of file diff --git a/dbms/src/TestUtils/ExecutorTestUtils.cpp b/dbms/src/TestUtils/ExecutorTestUtils.cpp index 67a21d12286..3347246553b 100644 --- a/dbms/src/TestUtils/ExecutorTestUtils.cpp +++ b/dbms/src/TestUtils/ExecutorTestUtils.cpp @@ -62,7 +62,9 @@ void ExecutorTest::executeInterpreter(const String & expected_string, const std: auto res = executeQuery(dag, context.context, false, QueryProcessingStage::Complete); FmtBuffer fb; res.in->dumpTree(fb); - ASSERT_EQ(Poco::trim(expected_string), Poco::trim(fb.toString())); +// ASSERT_EQ(Poco::trim(expected_string), Poco::trim(fb.toString())); + std::cout << "\n\n" << expected_string << std::endl; + std::cout << Poco::trim(fb.toString()) << std::endl; } namespace From ca45abfe044887c73eea2d5192edf53a655998d6 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 8 Jun 2022 18:17:15 +0800 Subject: [PATCH 16/20] add parallel tests --- .../Flash/tests/gtest_planner_interpreter.cpp | 281 ++++++++++++--- ...terpreter.cpp => gtest_qb_interpreter.cpp} | 324 +++++++++++++++++- dbms/src/TestUtils/ExecutorTestUtils.cpp | 4 +- 3 files changed, 549 insertions(+), 60 deletions(-) rename dbms/src/Flash/tests/{gtest_interpreter.cpp => gtest_qb_interpreter.cpp} (60%) diff --git a/dbms/src/Flash/tests/gtest_planner_interpreter.cpp b/dbms/src/Flash/tests/gtest_planner_interpreter.cpp index a1a39d444ca..acb5ae0d2c9 100644 --- a/dbms/src/Flash/tests/gtest_planner_interpreter.cpp +++ b/dbms/src/Flash/tests/gtest_planner_interpreter.cpp @@ -392,133 +392,290 @@ try { /// executor with table scan auto request = context.scan("test_db", "test_table_1") - .limit(10) - .build(context); + .limit(10) + .build(context); { - String expected; + String expected = R"( +Limit, limit = 10 + Expression: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); - expected = R"()"; + expected = R"( +Union: + SharedQuery x 5: + Limit, limit = 10 + Union: + Limit x 5, limit = 10 + Expression: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 5); } request = context.scan("test_db", "test_table_1") - .project({"s1", "s2", "s3"}) - .build(context); + .project({"s1", "s2", "s3"}) + .build(context); { - String expected; + String expected = R"( +Expression: + Expression: + Expression: + Expression: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); - expected = R"()"; + expected = R"( +Union: + Expression x 5: + Expression: + Expression: + Expression: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 5); } request = context.scan("test_db", "test_table_1") - .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) - .build(context); + .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) + .build(context); { - String expected; + String expected = R"( +Expression: + Aggregating + Concat + Expression: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); - expected = R"()"; + expected = R"( +Union: + Expression x 5: + SharedQuery: + ParallelAggregating, max_threads: 5, final: true + Expression x 5: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 5); } request = context.scan("test_db", "test_table_1") - .topN("s2", false, 10) - .build(context); + .topN("s2", false, 10) + .build(context); { - String expected; + String expected = R"( +Expression: + MergeSorting, limit = 10 + PartialSorting: limit = 10 + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); - expected = R"()"; + expected = R"( +Union: + SharedQuery x 5: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 5: limit = 10 + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 5); } request = context.scan("test_db", "test_table_1") - .filter(eq(col("s2"), col("s3"))) - .build(context); + .filter(eq(col("s2"), col("s3"))) + .build(context); { - String expected; + String expected = R"( +Expression: + Expression: + Filter: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); - expected = R"()"; + expected = R"( +Union: + Expression x 5: + Expression: + Filter: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 5); } /// other cases request = context.scan("test_db", "test_table_1") - .limit(10) - .project({"s1", "s2", "s3"}) - .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) - .build(context); + .limit(10) + .project({"s1", "s2", "s3"}) + .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) + .build(context); { - String expected; + String expected = R"( +Union: + Expression x 10: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + Expression: + Expression: + SharedQuery: + Limit, limit = 10 + Union: + Limit x 10, limit = 10 + Expression: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); - expected = R"()"; + expected = R"(Expression: + Aggregating + Concat + Expression: + Expression: + Expression: + Limit, limit = 10 + Expression: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); } request = context.scan("test_db", "test_table_1") - .topN("s2", false, 10) - .project({"s1", "s2", "s3"}) - .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) - .build(context); + .topN("s2", false, 10) + .project({"s1", "s2", "s3"}) + .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) + .build(context); { - String expected; + String expected = R"( +Union: + Expression x 10: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + Expression: + Expression: + SharedQuery: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 10: limit = 10 + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); - expected = R"()"; + expected = R"( +Expression: + Aggregating + Concat + Expression: + Expression: + Expression: + Expression: + MergeSorting, limit = 10 + PartialSorting: limit = 10 + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); } request = context.scan("test_db", "test_table_1") - .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) - .project({"s2", "s3"}) - .aggregation({Max(col("s2"))}, {col("s3")}) - .build(context); + .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) + .project({"s2", "s3"}) + .aggregation({Max(col("s2"))}, {col("s3")}) + .build(context); { - String expected; + String expected = R"( +Union: + Expression x 10: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + Expression: + Expression: + Expression: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); - expected = R"()"; + expected = R"( +Expression: + Aggregating + Concat + Expression: + Expression: + Expression: + Expression: + Aggregating + Concat + Expression: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); } request = context.scan("test_db", "test_table_1") - .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) - .exchangeSender(tipb::PassThrough) - .build(context); + .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) + .exchangeSender(tipb::PassThrough) + .build(context); { - String expected; + String expected = R"( +Union: + MockExchangeSender x 10 + Expression: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); - expected = R"()"; + expected = R"( +MockExchangeSender + Expression: + Aggregating + Concat + Expression: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); } request = context.scan("test_db", "test_table_1") - .topN("s2", false, 10) - .exchangeSender(tipb::PassThrough) - .build(context); + .topN("s2", false, 10) + .exchangeSender(tipb::PassThrough) + .build(context); { - String expected; + String expected = R"( +Union: + MockExchangeSender x 10 + SharedQuery: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 10: limit = 10 + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); - expected = R"()"; + expected = R"( +MockExchangeSender + Expression: + MergeSorting, limit = 10 + PartialSorting: limit = 10 + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); } request = context.scan("test_db", "test_table_1") - .limit(10) - .exchangeSender(tipb::PassThrough) - .build(context); + .limit(10) + .exchangeSender(tipb::PassThrough) + .build(context); { - String expected; + String expected = R"( +Union: + MockExchangeSender x 10 + SharedQuery: + Limit, limit = 10 + Union: + Limit x 10, limit = 10 + Expression: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); - expected = R"()"; + expected = R"( +MockExchangeSender + Limit, limit = 10 + Expression: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); } @@ -526,7 +683,23 @@ try DAGRequestBuilder table2 = context.scan("test_db", "l_table"); request = table1.join(table2.limit(1), {col("join_c")}, ASTTableJoin::Kind::Left).build(context); { - String expected; + String expected = R"( +CreatingSets + Union: + HashJoinBuildBlockInputStream x 10: , join_kind = Left + Expression: + SharedQuery: + Limit, limit = 1 + Union: + Limit x 10, limit = 1 + Expression: + MockTableScan + Union: + Expression x 10: + Expression: + HashJoinProbe: + Expression: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } } diff --git a/dbms/src/Flash/tests/gtest_interpreter.cpp b/dbms/src/Flash/tests/gtest_qb_interpreter.cpp similarity index 60% rename from dbms/src/Flash/tests/gtest_interpreter.cpp rename to dbms/src/Flash/tests/gtest_qb_interpreter.cpp index c59a6c7a085..c8ac422fdb3 100644 --- a/dbms/src/Flash/tests/gtest_interpreter.cpp +++ b/dbms/src/Flash/tests/gtest_qb_interpreter.cpp @@ -19,7 +19,7 @@ namespace DB { namespace tests { -class InterpreterExecuteTest : public DB::tests::ExecutorTest +class QBInterpreterExecuteTest : public DB::tests::ExecutorTest { public: void initializeContext() override @@ -38,7 +38,7 @@ class InterpreterExecuteTest : public DB::tests::ExecutorTest } }; -TEST_F(InterpreterExecuteTest, SingleQueryBlock) +TEST_F(QBInterpreterExecuteTest, SingleQueryBlock) try { auto request = context.scan("test_db", "test_table_1") @@ -92,7 +92,7 @@ Union: } CATCH -TEST_F(InterpreterExecuteTest, MultipleQueryBlockWithSource) +TEST_F(QBInterpreterExecuteTest, MultipleQueryBlockWithSource) try { auto request = context.scan("test_db", "test_table_1") @@ -387,5 +387,323 @@ CreatingSets } CATCH +TEST_F(QBInterpreterExecuteTest, ParallelQuery) +try +{ + /// executor with table scan + auto request = context.scan("test_db", "test_table_1") + .limit(10) + .build(context); + { + String expected = R"( +Limit, limit = 10 + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + + expected = R"( +Union: + SharedQuery x 5: + Limit, limit = 10 + Union: + Limit x 5, limit = 10 + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 5); + } + + request = context.scan("test_db", "test_table_1") + .project({"s1", "s2", "s3"}) + .build(context); + { + String expected = R"( +Expression: + Expression: + Expression: + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + + expected = R"( +Union: + Expression x 5: + Expression: + Expression: + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 5); + } + + request = context.scan("test_db", "test_table_1") + .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) + .build(context); + { + String expected = R"( +Expression: + Aggregating + Concat + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + + expected = R"( +Union: + Expression x 5: + SharedQuery: + ParallelAggregating, max_threads: 5, final: true + Expression x 5: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 5); + } + + request = context.scan("test_db", "test_table_1") + .topN("s2", false, 10) + .build(context); + { + String expected = R"( +Expression: + MergeSorting, limit = 10 + PartialSorting: limit = 10 + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + + expected = R"( +Union: + SharedQuery x 5: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 5: limit = 10 + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 5); + } + + request = context.scan("test_db", "test_table_1") + .filter(eq(col("s2"), col("s3"))) + .build(context); + { + String expected = R"( +Expression: + Expression: + Filter: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + + expected = R"( +Union: + Expression x 5: + Expression: + Filter: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 5); + } + + /// other cases + request = context.scan("test_db", "test_table_1") + .limit(10) + .project({"s1", "s2", "s3"}) + .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) + .build(context); + { + String expected = R"( +Union: + Expression x 10: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + Expression: + Expression: + SharedQuery: + Limit, limit = 10 + Union: + Limit x 10, limit = 10 + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + + expected = R"(Expression: + Aggregating + Concat + Expression: + Expression: + Expression: + Limit, limit = 10 + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + } + + request = context.scan("test_db", "test_table_1") + .topN("s2", false, 10) + .project({"s1", "s2", "s3"}) + .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) + .build(context); + { + String expected = R"( +Union: + Expression x 10: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + Expression: + Expression: + SharedQuery: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 10: limit = 10 + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + + expected = R"( +Expression: + Aggregating + Concat + Expression: + Expression: + Expression: + Expression: + MergeSorting, limit = 10 + PartialSorting: limit = 10 + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + } + + request = context.scan("test_db", "test_table_1") + .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) + .project({"s2", "s3"}) + .aggregation({Max(col("s2"))}, {col("s3")}) + .build(context); + { + String expected = R"( +Union: + Expression x 10: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + Expression: + Expression: + Expression: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + + expected = R"( +Expression: + Aggregating + Concat + Expression: + Expression: + Expression: + Expression: + Aggregating + Concat + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + } + + request = context.scan("test_db", "test_table_1") + .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) + .exchangeSender(tipb::PassThrough) + .build(context); + { + String expected = R"( +Union: + MockExchangeSender x 10 + Expression: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + + expected = R"( +MockExchangeSender + Expression: + Aggregating + Concat + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + } + + request = context.scan("test_db", "test_table_1") + .topN("s2", false, 10) + .exchangeSender(tipb::PassThrough) + .build(context); + { + String expected = R"( +Union: + MockExchangeSender x 10 + SharedQuery: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 10: limit = 10 + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + + expected = R"( +MockExchangeSender + Expression: + MergeSorting, limit = 10 + PartialSorting: limit = 10 + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + } + + request = context.scan("test_db", "test_table_1") + .limit(10) + .exchangeSender(tipb::PassThrough) + .build(context); + { + String expected = R"( +Union: + MockExchangeSender x 10 + SharedQuery: + Limit, limit = 10 + Union: + Limit x 10, limit = 10 + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + + expected = R"( +MockExchangeSender + Limit, limit = 10 + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + } + + DAGRequestBuilder table1 = context.scan("test_db", "r_table"); + DAGRequestBuilder table2 = context.scan("test_db", "l_table"); + request = table1.join(table2.limit(1), {col("join_c")}, ASTTableJoin::Kind::Left).build(context); + { + String expected = R"( +CreatingSets + Union: + HashJoinBuildBlockInputStream x 10: , join_kind = Left + Expression: + SharedQuery: + Limit, limit = 1 + Union: + Limit x 10, limit = 1 + Expression: + MockTableScan + Union: + Expression x 10: + Expression: + HashJoinProbe: + Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } +} +CATCH + } // namespace tests } // namespace DB \ No newline at end of file diff --git a/dbms/src/TestUtils/ExecutorTestUtils.cpp b/dbms/src/TestUtils/ExecutorTestUtils.cpp index 3347246553b..67a21d12286 100644 --- a/dbms/src/TestUtils/ExecutorTestUtils.cpp +++ b/dbms/src/TestUtils/ExecutorTestUtils.cpp @@ -62,9 +62,7 @@ void ExecutorTest::executeInterpreter(const String & expected_string, const std: auto res = executeQuery(dag, context.context, false, QueryProcessingStage::Complete); FmtBuffer fb; res.in->dumpTree(fb); -// ASSERT_EQ(Poco::trim(expected_string), Poco::trim(fb.toString())); - std::cout << "\n\n" << expected_string << std::endl; - std::cout << Poco::trim(fb.toString()) << std::endl; + ASSERT_EQ(Poco::trim(expected_string), Poco::trim(fb.toString())); } namespace From 5a95ca0550994fcf31fe6b8be38772387ad3e506 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 8 Jun 2022 18:33:04 +0800 Subject: [PATCH 17/20] fix --- dbms/src/Flash/Planner/Planner.cpp | 4 ++-- dbms/src/Flash/Planner/optimize.cpp | 3 ++- dbms/src/Flash/Planner/optimize.h | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/dbms/src/Flash/Planner/Planner.cpp b/dbms/src/Flash/Planner/Planner.cpp index 5e234dd3365..3ccfc1234d3 100644 --- a/dbms/src/Flash/Planner/Planner.cpp +++ b/dbms/src/Flash/Planner/Planner.cpp @@ -67,12 +67,12 @@ void Planner::executeImpl(DAGPipeline & pipeline) PhysicalPlanBuilder builder{context, log->identifier()}; for (const auto & input_streams : input_streams_vec) { - assert(!input_streams.empty()); + RUNTIME_ASSERT(!input_streams.empty(), log, "input streams cannot be empty"); builder.buildSource(input_streams.back()->getHeader()); } auto physical_plan = builder.getResult(); - optimize(context, physical_plan); + physical_plan = optimize(context, physical_plan); physical_plan->transform(pipeline, context, max_streams); } } // namespace DB diff --git a/dbms/src/Flash/Planner/optimize.cpp b/dbms/src/Flash/Planner/optimize.cpp index 025c25fd9f1..244ddd534b6 100644 --- a/dbms/src/Flash/Planner/optimize.cpp +++ b/dbms/src/Flash/Planner/optimize.cpp @@ -38,11 +38,12 @@ class FinalizeRule : public Rule static RulePtr create() { return std::make_shared(); } }; -void optimize(const Context & context, PhysicalPlanPtr plan) +PhysicalPlanPtr optimize(const Context & context, PhysicalPlanPtr plan) { assert(plan); static std::vector rules{FinalizeRule::create()}; for (const auto & rule : rules) plan = rule->apply(context, plan); + return plan; } } // namespace DB diff --git a/dbms/src/Flash/Planner/optimize.h b/dbms/src/Flash/Planner/optimize.h index 03da563d1c0..8ba738c9f77 100644 --- a/dbms/src/Flash/Planner/optimize.h +++ b/dbms/src/Flash/Planner/optimize.h @@ -19,5 +19,5 @@ namespace DB { class Context; -void optimize(const Context & context, PhysicalPlanPtr plan); +PhysicalPlanPtr optimize(const Context & context, PhysicalPlanPtr plan); } // namespace DB From 6f9dc619748f93756b6f346c4924b614835d9861 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Thu, 9 Jun 2022 11:13:35 +0800 Subject: [PATCH 18/20] format --- dbms/src/Flash/Planner/plans/PhysicalLeaf.h | 6 +++++- dbms/src/Flash/Planner/plans/PhysicalUnary.h | 12 ++++++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/dbms/src/Flash/Planner/plans/PhysicalLeaf.h b/dbms/src/Flash/Planner/plans/PhysicalLeaf.h index f281ce7f1e0..50ced412c13 100644 --- a/dbms/src/Flash/Planner/plans/PhysicalLeaf.h +++ b/dbms/src/Flash/Planner/plans/PhysicalLeaf.h @@ -25,7 +25,11 @@ namespace DB class PhysicalLeaf : public PhysicalPlan { public: - PhysicalLeaf(const String & executor_id_, const PlanType & type_, const NamesAndTypes & schema_, const String & req_id) + PhysicalLeaf( + const String & executor_id_, + const PlanType & type_, + const NamesAndTypes & schema_, + const String & req_id) : PhysicalPlan(executor_id_, type_, schema_, req_id) {} diff --git a/dbms/src/Flash/Planner/plans/PhysicalUnary.h b/dbms/src/Flash/Planner/plans/PhysicalUnary.h index a5fab90ddbc..4d0091bb8e3 100644 --- a/dbms/src/Flash/Planner/plans/PhysicalUnary.h +++ b/dbms/src/Flash/Planner/plans/PhysicalUnary.h @@ -27,20 +27,24 @@ namespace DB class PhysicalUnary : public PhysicalPlan { public: - PhysicalUnary(const String & executor_id_, const PlanType & type_, const NamesAndTypes & schema_, const String & req_id) + PhysicalUnary( + const String & executor_id_, + const PlanType & type_, + const NamesAndTypes & schema_, + const String & req_id) : PhysicalPlan(executor_id_, type_, schema_, req_id) {} PhysicalPlanPtr children(size_t i) const override { - RUNTIME_ASSERT(i == 0, log, fmt::format("child_index({}) should not >= childrenSize({})", i, childrenSize())); + RUNTIME_ASSERT(i == 0, log, "child_index({}) should not >= childrenSize({})", i, childrenSize()); assert(child); return child; } void setChild(size_t i, const PhysicalPlanPtr & new_child) override { - RUNTIME_ASSERT(i == 0, log, fmt::format("child_index({}) should not >= childrenSize({})", i, childrenSize())); + RUNTIME_ASSERT(i == 0, log, "child_index({}) should not >= childrenSize({})", i, childrenSize()); assert(new_child); assert(new_child.get() != this); child = new_child; @@ -48,7 +52,7 @@ class PhysicalUnary : public PhysicalPlan void appendChild(const PhysicalPlanPtr & new_child) override { - RUNTIME_ASSERT(!child, log, fmt::format("the actual children size had be the max size({}), don't append child again", childrenSize())); + RUNTIME_ASSERT(!child, log, "the actual children size had be the max size({}), don't append child again", childrenSize()); assert(new_child); assert(new_child.get() != this); child = new_child; From edf26bce64914a550e3beb1e698ee2e69cbdde45 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Thu, 9 Jun 2022 11:55:21 +0800 Subject: [PATCH 19/20] address comment --- dbms/src/Flash/Planner/Planner.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Flash/Planner/Planner.h b/dbms/src/Flash/Planner/Planner.h index 36ba918d780..8941420caf9 100644 --- a/dbms/src/Flash/Planner/Planner.h +++ b/dbms/src/Flash/Planner/Planner.h @@ -52,7 +52,7 @@ class Planner const DAGQueryBlock & query_block; - /// How many streams we ask for storage to produce, and in how many threads we will do further processing. + /// How many streams we will do processing. size_t max_streams = 1; LoggerPtr log; From d31b310c52172d3f82ca4e5bb347ad0d42c3f9d1 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Fri, 10 Jun 2022 15:23:21 +0800 Subject: [PATCH 20/20] address comments --- dbms/src/Flash/Planner/PlanType.h | 2 +- dbms/src/Flash/Planner/Planner.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Flash/Planner/PlanType.h b/dbms/src/Flash/Planner/PlanType.h index 9e80b152652..9a5f26a497b 100644 --- a/dbms/src/Flash/Planner/PlanType.h +++ b/dbms/src/Flash/Planner/PlanType.h @@ -22,7 +22,7 @@ struct PlanType { enum PlanTypeEnum { - Source = 0x1, + Source = 0, }; PlanTypeEnum enum_value; diff --git a/dbms/src/Flash/Planner/Planner.h b/dbms/src/Flash/Planner/Planner.h index 8941420caf9..482d9cc5d76 100644 --- a/dbms/src/Flash/Planner/Planner.h +++ b/dbms/src/Flash/Planner/Planner.h @@ -52,7 +52,7 @@ class Planner const DAGQueryBlock & query_block; - /// How many streams we will do processing. + /// Max streams we will do processing. size_t max_streams = 1; LoggerPtr log;