Skip to content

Commit

Permalink
Planner: planner without DAGQueryBlock (#5381)
Browse files Browse the repository at this point in the history
ref #4739
  • Loading branch information
SeaRise authored Aug 2, 2022
1 parent d9bcbde commit 0f26fe8
Show file tree
Hide file tree
Showing 44 changed files with 915 additions and 615 deletions.
6 changes: 2 additions & 4 deletions dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@
#include <DataStreams/copyData.h>
#include <Flash/Coprocessor/DAGBlockOutputStream.h>
#include <Flash/Coprocessor/DAGDriver.h>
#include <Flash/Coprocessor/DAGQuerySource.h>
#include <Flash/Coprocessor/StreamWriter.h>
#include <Flash/Coprocessor/StreamingDAGResponseWriter.h>
#include <Flash/Coprocessor/UnaryDAGResponseWriter.h>
#include <Flash/executeQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/executeQuery.h>
#include <Storages/Transaction/LockException.h>
#include <Storages/Transaction/RegionException.h>
#include <pingcap/Exception.h>
Expand Down Expand Up @@ -89,10 +88,9 @@ void DAGDriver<batch>::execute()
try
{
auto start_time = Clock::now();
DAGQuerySource dag(context);
DAGContext & dag_context = *context.getDAGContext();

BlockIO streams = executeQuery(dag, context, internal, QueryProcessingStage::Complete);
BlockIO streams = executeQuery(context, internal, QueryProcessingStage::Complete);
if (!streams.in || streams.out)
// Only query is allowed, so streams.in must not be null and streams.out must be null
throw TiFlashException("DAG is not query.", Errors::Coprocessor::Internal);
Expand Down
45 changes: 7 additions & 38 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <DataStreams/CreatingSetsBlockInputStream.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGQueryBlockInterpreter.h>
#include <Flash/Coprocessor/InterpreterDAG.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Planner/Planner.h>
#include <Interpreters/Context.h>

namespace DB
Expand Down Expand Up @@ -55,49 +53,20 @@ BlockInputStreams InterpreterDAG::executeQueryBlock(DAGQueryBlock & query_block)
BlockInputStreams child_streams = executeQueryBlock(*child);
input_streams_vec.push_back(child_streams);
}
if (context.getSettingsRef().enable_planner && Planner::isSupported(query_block))
{
LOG_FMT_DEBUG(dagContext().log, "use planer for query block with source {}", query_block.source_name);
Planner planner(
context,
input_streams_vec,
query_block,
max_streams);
return planner.execute();
}
else
{
DAGQueryBlockInterpreter query_block_interpreter(
context,
input_streams_vec,
query_block,
max_streams);
return query_block_interpreter.execute();
}
DAGQueryBlockInterpreter query_block_interpreter(
context,
input_streams_vec,
query_block,
max_streams);
return query_block_interpreter.execute();
}

BlockIO InterpreterDAG::execute()
{
BlockInputStreams streams = executeQueryBlock(*dag.getRootQueryBlock());
DAGPipeline pipeline;
pipeline.streams = streams;
/// add union to run in parallel if needed
if (unlikely(dagContext().isTest()))
executeUnion(pipeline, max_streams, dagContext().log, /*ignore_block=*/false, "for test");
else if (dagContext().isMPPTask())
/// MPPTask do not need the returned blocks.
executeUnion(pipeline, max_streams, dagContext().log, /*ignore_block=*/true, "for mpp");
else
executeUnion(pipeline, max_streams, dagContext().log, /*ignore_block=*/false, "for non mpp");
if (dagContext().hasSubquery())
{
const Settings & settings = context.getSettingsRef();
pipeline.firstStream() = std::make_shared<CreatingSetsBlockInputStream>(
pipeline.firstStream(),
std::move(dagContext().moveSubqueries()),
SizeLimits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode),
dagContext().log->identifier());
}
executeCreatingSets(pipeline, context, max_streams, dagContext().log);
BlockIO res;
res.in = pipeline.firstStream();
return res;
Expand Down
27 changes: 27 additions & 0 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <DataStreams/CreatingSetsBlockInputStream.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/MergeSortingBlockInputStream.h>
#include <DataStreams/PartialSortingBlockInputStream.h>
Expand Down Expand Up @@ -165,4 +166,30 @@ void orderStreams(
log->identifier());
}
}

void executeCreatingSets(
DAGPipeline & pipeline,
const Context & context,
size_t max_streams,
const LoggerPtr & log)
{
DAGContext & dag_context = *context.getDAGContext();
/// add union to run in parallel if needed
if (unlikely(dag_context.isTest()))
executeUnion(pipeline, max_streams, log, /*ignore_block=*/false, "for test");
else if (dag_context.isMPPTask())
/// MPPTask do not need the returned blocks.
executeUnion(pipeline, max_streams, log, /*ignore_block=*/true, "for mpp");
else
executeUnion(pipeline, max_streams, log, /*ignore_block=*/false, "for non mpp");
if (dag_context.hasSubquery())
{
const Settings & settings = context.getSettingsRef();
pipeline.firstStream() = std::make_shared<CreatingSetsBlockInputStream>(
pipeline.firstStream(),
std::move(dag_context.moveSubqueries()),
SizeLimits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode),
log->identifier());
}
}
} // namespace DB
6 changes: 6 additions & 0 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,10 @@ void orderStreams(
bool enable_fine_grained_shuffle,
const Context & context,
const LoggerPtr & log);

void executeCreatingSets(
DAGPipeline & pipeline,
const Context & context,
size_t max_streams,
const LoggerPtr & log);
} // namespace DB
5 changes: 2 additions & 3 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
#include <Flash/Mpp/MinTSOScheduler.h>
#include <Flash/Mpp/Utils.h>
#include <Flash/Statistics/traverseExecutors.h>
#include <Flash/executeQuery.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/executeQuery.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/TMTContext.h>
#include <fmt/core.h>
Expand Down Expand Up @@ -318,8 +318,7 @@ void MPPTask::preprocess()
{
auto start_time = Clock::now();
initExchangeReceivers();
DAGQuerySource dag(*context);
executeQuery(dag, *context, false, QueryProcessingStage::Complete);
executeQuery(*context);
auto end_time = Clock::now();
dag_context->compile_time_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - start_time).count();
mpp_task_statistics.setCompileTimestamp(start_time, end_time);
Expand Down
23 changes: 4 additions & 19 deletions dbms/src/Flash/Planner/FinalizeHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,28 +118,13 @@ void checkSampleBlockContainsSchema(const Block & sample_block, const NamesAndTy
}
}

void checkSchemaContainsSampleBlock(const NamesAndTypes & schema, const Block & sample_block)
void checkSampleBlockContainsParentRequire(const Block & sample_block, const Names & parent_require)
{
std::unordered_map<String, DataTypePtr> schema_map;
for (const auto & column : schema)
schema_map[column.name] = column.type;
for (const auto & sample_block_column : sample_block)
for (const auto & parent_require_column : parent_require)
{
auto it = schema_map.find(sample_block_column.name);
if (unlikely(it == schema_map.end()))
if (unlikely(!sample_block.has(parent_require_column)))
throw TiFlashException(
fmt::format("schema {} don't contain sample block column: {}", schemaToString(schema), sample_block_column.name),
Errors::Planner::Internal);

const auto & type_in_schema = it->second->getName();
const auto & type_in_sample_block = sample_block_column.type->getName();
if (unlikely(type_in_sample_block != type_in_schema))
throw TiFlashException(
fmt::format(
"the type of column `{}` in schema `{}` is different from the one in sample block `{}`",
sample_block_column.name,
type_in_schema,
type_in_sample_block),
fmt::format("sample block {} don't contain parent_require column: {}", blockMetaToString(sample_block), parent_require_column),
Errors::Planner::Internal);
}
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Planner/FinalizeHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ void checkParentRequireContainsSchema(const Names & parent_require, const NamesA

void checkSampleBlockContainsSchema(const Block & sample_block, const NamesAndTypes & schema);

void checkSchemaContainsSampleBlock(const NamesAndTypes & schema, const Block & sample_block);
void checkSampleBlockContainsParentRequire(const Block & sample_block, const Names & parent_require);
} // namespace DB::FinalizeHelper
54 changes: 31 additions & 23 deletions dbms/src/Flash/Planner/PhysicalPlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
#include <Flash/Planner/plans/PhysicalMockExchangeSender.h>
#include <Flash/Planner/plans/PhysicalMockTableScan.h>
#include <Flash/Planner/plans/PhysicalProjection.h>
#include <Flash/Planner/plans/PhysicalSource.h>
#include <Flash/Planner/plans/PhysicalTableScan.h>
#include <Flash/Planner/plans/PhysicalTopN.h>
#include <Flash/Planner/plans/PhysicalWindow.h>
Expand All @@ -53,6 +52,16 @@ bool pushDownSelection(const PhysicalPlanNodePtr & plan, const String & executor
}
return false;
}

void fillOrderForListBasedExecutors(DAGContext & dag_context, const PhysicalPlanNodePtr & root_node)
{
auto & list_based_executors_order = dag_context.list_based_executors_order;
PhysicalPlanVisitor::visitPostOrder(root_node, [&](const PhysicalPlanNodePtr & plan) {
assert(plan);
if (plan->isRecordProfileStreams())
list_based_executors_order.push_back(plan->execId());
});
}
} // namespace

void PhysicalPlan::build(const tipb::DAGRequest * dag_request)
Expand Down Expand Up @@ -93,6 +102,7 @@ void PhysicalPlan::build(const String & executor_id, const tipb::Executor * exec
break;
case tipb::ExecType::TypeExchangeSender:
{
buildFinalProjection(fmt::format("{}_", executor_id), true);
if (unlikely(dagContext().isTest()))
pushBack(PhysicalMockExchangeSender::build(executor_id, log, popBack()));
else
Expand Down Expand Up @@ -129,27 +139,13 @@ void PhysicalPlan::build(const String & executor_id, const tipb::Executor * exec
}
case tipb::ExecType::TypeJoin:
{
auto right = popBack();
auto left = popBack();

/// Both sides of the join need to have non-root-final-projection to ensure that
/// there are no duplicate columns in the blocks on the build and probe sides.
buildFinalProjection(fmt::format("{}_r_", executor_id), false);
auto right = popBack();

/// After DAGQueryBlock removed, `dagContext().isTest() && right->tp() != PlanType::Source`
/// and `dagContext().isTest() && right->tp() != PlanType::Source` will be removed.
if (dagContext().isTest() && right->tp() != PlanType::Source)
{
pushBack(right);
buildFinalProjection(fmt::format("{}_r_", executor_id), false);
right = popBack();
}

if (dagContext().isTest() && right->tp() != PlanType::Source)
{
pushBack(left);
buildFinalProjection(fmt::format("{}_l_", executor_id), false);
left = popBack();
}
buildFinalProjection(fmt::format("{}_l_", executor_id), false);
auto left = popBack();

pushBack(PhysicalJoin::build(context, executor_id, log, executor->join(), left, right));
break;
Expand Down Expand Up @@ -199,29 +195,41 @@ PhysicalPlanNodePtr PhysicalPlan::popBack()
return back;
}

void PhysicalPlan::buildSource(const String & executor_id, const BlockInputStreams & source_streams)
/// For MPP, root final projection has been added under PhysicalExchangeSender or PhysicalMockExchangeSender.
/// For batchcop/cop that without PhysicalExchangeSender or PhysicalMockExchangeSender, We need to add root final projection.
void PhysicalPlan::addRootFinalProjectionIfNeed()
{
pushBack(PhysicalSource::build(executor_id, source_streams, log));
assert(root_node);
if (root_node->tp() != PlanType::ExchangeSender && root_node->tp() != PlanType::MockExchangeSender)
{
pushBack(root_node);
buildFinalProjection(fmt::format("{}_", root_node->execId()), true);
root_node = popBack();
}
}

void PhysicalPlan::outputAndOptimize()
{
RUNTIME_ASSERT(!root_node, log, "root_node shoud be nullptr before `outputAndOptimize`");
RUNTIME_ASSERT(cur_plan_nodes.size() == 1, log, "There can only be one plan node output, but here are {}", cur_plan_nodes.size());

root_node = popBack();
addRootFinalProjectionIfNeed();

LOG_FMT_DEBUG(
log,
"build unoptimized physical plan: \n{}",
toString());

root_node = optimize(context, root_node);
root_node = optimize(context, root_node, log);
LOG_FMT_DEBUG(
log,
"build optimized physical plan: \n{}",
toString());

RUNTIME_ASSERT(root_node, log, "root_node shoudn't be nullptr after `outputAndOptimize`");

if (!dagContext().return_executor_id)
fillOrderForListBasedExecutors(dagContext(), root_node);
}

String PhysicalPlan::toString() const
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Flash/Planner/PhysicalPlan.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,6 @@ class PhysicalPlan

void build(const tipb::DAGRequest * dag_request);

void build(const String & executor_id, const tipb::Executor * executor);

void buildSource(const String & executor_id, const BlockInputStreams & source_streams);

void buildFinalProjection(const String & column_prefix, bool is_root);

// after outputAndOptimize, the physical plan node tree is done.
void outputAndOptimize();

Expand All @@ -48,6 +42,12 @@ class PhysicalPlan
void transform(DAGPipeline & pipeline, Context & context, size_t max_streams);

private:
void addRootFinalProjectionIfNeed();

void build(const String & executor_id, const tipb::Executor * executor);

void buildFinalProjection(const String & column_prefix, bool is_root);

PhysicalPlanNodePtr popBack();

void pushBack(const PhysicalPlanNodePtr & plan);
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Planner/PhysicalPlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ class PhysicalPlanNode
/// Obtain a sample block that contains the names and types of result columns.
virtual const Block & getSampleBlock() const = 0;

bool isRecordProfileStreams() const { return is_record_profile_streams; }

void disableRecordProfileStreams() { is_record_profile_streams = false; }

void disableRestoreConcurrency() { is_restore_concurrency = false; }
Expand Down
12 changes: 12 additions & 0 deletions dbms/src/Flash/Planner/PhysicalPlanVisitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,17 @@ void visit(const PhysicalPlanNodePtr & plan, FF && f)
}
}

/// visit physical plan node tree in reverse order and apply function.
/// f: (const PhysicalPlanNodePtr &).
template <typename FF>
void visitPostOrder(const PhysicalPlanNodePtr & plan, FF && f)
{
for (size_t i = 0; i < plan->childrenSize(); ++i)
{
visitPostOrder(plan->children(i), std::forward<FF>(f));
}
f(plan);
}

String visitToString(const PhysicalPlanNodePtr & plan);
} // namespace DB::PhysicalPlanVisitor
Loading

0 comments on commit 0f26fe8

Please sign in to comment.