Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate table scan related code to DAGStorageInterpreter #4783

Merged
298 changes: 4 additions & 294 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp

Large diffs are not rendered by default.

10 changes: 0 additions & 10 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,6 @@ class DAGQueryBlockInterpreter
#endif
void executeImpl(DAGPipeline & pipeline);
void handleTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline);
void executeCastAfterTableScan(
const TiDBTableScan & table_scan,
const std::vector<ExtraCastAfterTSMode> & is_need_add_cast_column,
size_t remote_read_streams_start_index,
DAGPipeline & pipeline);
void executePushedDownFilter(const std::vector<const tipb::Expr *> & conditions, size_t remote_read_streams_start_index, DAGPipeline & pipeline);
void handleJoin(const tipb::Join & join, DAGPipeline & pipeline, SubqueryForSet & right_query);
void prepareJoin(
const google::protobuf::RepeatedPtrField<tipb::Expr> & keys,
Expand Down Expand Up @@ -108,10 +102,6 @@ class DAGQueryBlockInterpreter

void restorePipelineConcurrency(DAGPipeline & pipeline);

void executeRemoteQueryImpl(
DAGPipeline & pipeline,
std::vector<RemoteRequest> & remote_requests);

DAGContext & dagContext() const { return *context.getDAGContext(); }

Context & context;
Expand Down
379 changes: 339 additions & 40 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp

Large diffs are not rendered by default.

41 changes: 29 additions & 12 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
#include <Flash/Coprocessor/ChunkCodec.h>
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
#include <Flash/Coprocessor/DAGPipeline.h>
#include <Flash/Coprocessor/DAGQueryBlock.h>
#include <Flash/Coprocessor/DAGQuerySource.h>
#include <Flash/Coprocessor/PushDownFilter.h>
#include <Flash/Coprocessor/RemoteRequest.h>
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Interpreters/Context.h>
Expand Down Expand Up @@ -50,9 +49,8 @@ class DAGStorageInterpreter
public:
DAGStorageInterpreter(
Context & context_,
const DAGQueryBlock & query_block_,
const TiDBTableScan & table_scan,
const std::vector<const tipb::Expr *> & conditions_,
const PushDownFilter & push_down_filter_,
size_t max_streams_);

DAGStorageInterpreter(DAGStorageInterpreter &&) = delete;
Expand All @@ -63,12 +61,6 @@ class DAGStorageInterpreter
/// Members will be transfered to DAGQueryBlockInterpreter after execute

std::unique_ptr<DAGExpressionAnalyzer> analyzer;
std::vector<ExtraCastAfterTSMode> is_need_add_cast_column;
/// it shouldn't be hash map because duplicated region id may occur if merge regions to retry of dag.
RegionRetryList region_retry_from_local_region;
TableLockHolders drop_locks;
std::vector<RemoteRequest> remote_requests;
BlockInputStreamPtr null_stream_if_empty;

private:
struct StorageWithStructureLock
Expand All @@ -92,12 +84,37 @@ class DAGStorageInterpreter

std::unordered_map<TableID, SelectQueryInfo> generateSelectQueryInfos();

DAGContext & dagContext() const;

void recordProfileStreams(DAGPipeline & pipeline, const String & key);

void executeRemoteQuery(DAGPipeline & pipeline);

void executeCastAfterTableScan(
size_t remote_read_streams_start_index,
DAGPipeline & pipeline);

void executePushedDownFilter(
size_t remote_read_streams_start_index,
DAGPipeline & pipeline);

void prepare();

void executeImpl(DAGPipeline & pipeline);

private:
std::vector<ExtraCastAfterTSMode> is_need_add_cast_column;
/// it shouldn't be hash map because duplicated region id may occur if merge regions to retry of dag.
RegionRetryList region_retry_from_local_region;
TableLockHolders drop_locks;
std::vector<RemoteRequest> remote_requests;
BlockInputStreamPtr null_stream_if_empty;

/// passed from caller, doesn't change during DAGStorageInterpreter's lifetime

Context & context;
const DAGQueryBlock & query_block;
const TiDBTableScan & table_scan;
const std::vector<const tipb::Expr *> & conditions;
const PushDownFilter & push_down_filter;
size_t max_streams;
LoggerPtr log;

Expand Down
14 changes: 14 additions & 0 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <DataStreams/SharedQueryBlockInputStream.h>
#include <DataStreams/UnionBlockInputStream.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Interpreters/Context.h>

namespace DB
{
Expand Down Expand Up @@ -79,4 +80,17 @@ void executeUnion(
pipeline.streams.push_back(non_joined_data_stream);
}
}

ExpressionActionsPtr generateProjectExpressionActions(
const BlockInputStreamPtr & stream,
const Context & context,
const NamesWithAliases & project_cols)
{
NamesAndTypesList input_column;
for (const auto & column : stream->getHeader())
input_column.emplace_back(column.name, column.type);
ExpressionActionsPtr project = std::make_shared<ExpressionActions>(input_column, context.getSettingsRef());
project->add(ExpressionAction::project(project_cols));
return project;
}
} // namespace DB
8 changes: 8 additions & 0 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@

#include <Common/Logger.h>
#include <Flash/Coprocessor/DAGPipeline.h>
#include <Interpreters/ExpressionActions.h>

namespace DB
{
class Context;

void restoreConcurrency(
DAGPipeline & pipeline,
size_t concurrency,
Expand All @@ -35,4 +38,9 @@ void executeUnion(
size_t max_streams,
const LoggerPtr & log,
bool ignore_block = false);

ExpressionActionsPtr generateProjectExpressionActions(
const BlockInputStreamPtr & stream,
const Context & context,
const NamesWithAliases & project_cols);
} // namespace DB
65 changes: 65 additions & 0 deletions dbms/src/Flash/Coprocessor/PushDownFilter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/TiFlashException.h>
#include <Flash/Coprocessor/PushDownFilter.h>
#include <common/likely.h>

namespace DB
{
PushDownFilter::PushDownFilter(
const String & executor_id_,
const std::vector<const tipb::Expr *> & conditions_)
: executor_id(executor_id_)
, conditions(conditions_)
{
if (unlikely(conditions.empty() != executor_id.empty()))
{
throw TiFlashException(
"for PushDownFilter, conditions and executor_id should both be empty or neither should be empty",
Errors::Coprocessor::BadRequest);
}
}

tipb::Executor * PushDownFilter::constructSelectionForRemoteRead(tipb::Executor * mutable_executor) const
{
if (hasValue())
{
mutable_executor->set_tp(tipb::ExecType::TypeSelection);
mutable_executor->set_executor_id(executor_id);
auto * new_selection = mutable_executor->mutable_selection();
for (const auto & condition : conditions)
*new_selection->add_conditions() = *condition;
return new_selection->mutable_child();
}
else
{
return mutable_executor;
}
}

PushDownFilter PushDownFilter::toPushDownFilter(const tipb::Executor * filter_executor)
{
if (!filter_executor || !filter_executor->has_selection())
{
return {"", {}};
}

std::vector<const tipb::Expr *> conditions;
for (const auto & condition : filter_executor->selection().conditions())
conditions.push_back(&condition);

return {filter_executor->executor_id(), conditions};
}
} // namespace DB
39 changes: 39 additions & 0 deletions dbms/src/Flash/Coprocessor/PushDownFilter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <common/types.h>
#include <tipb/executor.pb.h>

#include <vector>

namespace DB
{
struct PushDownFilter
{
static PushDownFilter toPushDownFilter(const tipb::Executor * filter_executor);

PushDownFilter(
const String & executor_id_,
const std::vector<const tipb::Expr *> & conditions_);

bool hasValue() const { return !conditions.empty(); }

tipb::Executor * constructSelectionForRemoteRead(tipb::Executor * mutable_executor) const;

String executor_id;
std::vector<const tipb::Expr *> conditions;
};
} // namespace DB
19 changes: 8 additions & 11 deletions dbms/src/Flash/Coprocessor/RemoteRequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@

namespace DB
{
RemoteRequest RemoteRequest::build(const RegionRetryList & retry_regions, DAGContext & dag_context, const TiDBTableScan & table_scan, const TiDB::TableInfo & table_info, const tipb::Executor * selection, LoggerPtr & log)
RemoteRequest RemoteRequest::build(
const RegionRetryList & retry_regions,
DAGContext & dag_context,
const TiDBTableScan & table_scan,
const TiDB::TableInfo & table_info,
const PushDownFilter & push_down_filter,
const LoggerPtr & log)
{
auto print_retry_regions = [&retry_regions, &table_info] {
FmtBuffer buffer;
Expand All @@ -35,16 +41,7 @@ RemoteRequest RemoteRequest::build(const RegionRetryList & retry_regions, DAGCon

DAGSchema schema;
tipb::DAGRequest dag_req;
auto * executor = dag_req.mutable_root_executor();
if (selection != nullptr)
{
executor->set_tp(tipb::ExecType::TypeSelection);
executor->set_executor_id(selection->executor_id());
auto * new_selection = executor->mutable_selection();
for (const auto & condition : selection->selection().conditions())
*new_selection->add_conditions() = condition;
executor = new_selection->mutable_child();
}
auto * executor = push_down_filter.constructSelectionForRemoteRead(dag_req.mutable_root_executor());

{
tipb::Executor * ts_exec = executor;
Expand Down
14 changes: 12 additions & 2 deletions dbms/src/Flash/Coprocessor/RemoteRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Flash/Coprocessor/PushDownFilter.h>
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Storages/Transaction/TiDB.h>
#include <pingcap/coprocessor/Client.h>
Expand All @@ -34,7 +35,10 @@ using DAGSchema = std::vector<DAGColumnInfo>;

struct RemoteRequest
{
RemoteRequest(tipb::DAGRequest && dag_request_, DAGSchema && schema_, std::vector<pingcap::coprocessor::KeyRange> && key_ranges_)
RemoteRequest(
tipb::DAGRequest && dag_request_,
DAGSchema && schema_,
std::vector<pingcap::coprocessor::KeyRange> && key_ranges_)
: dag_request(std::move(dag_request_))
, schema(std::move(schema_))
, key_ranges(std::move(key_ranges_))
Expand All @@ -43,6 +47,12 @@ struct RemoteRequest
DAGSchema schema;
/// the sorted key ranges
std::vector<pingcap::coprocessor::KeyRange> key_ranges;
static RemoteRequest build(const RegionRetryList & retry_regions, DAGContext & dag_context, const TiDBTableScan & table_scan, const TiDB::TableInfo & table_info, const tipb::Executor * selection, LoggerPtr & log);
static RemoteRequest build(
const RegionRetryList & retry_regions,
DAGContext & dag_context,
const TiDBTableScan & table_scan,
const TiDB::TableInfo & table_info,
const PushDownFilter & push_down_filter,
const LoggerPtr & log);
};
} // namespace DB
6 changes: 5 additions & 1 deletion dbms/src/Flash/Coprocessor/TiDBTableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@

namespace DB
{
TiDBTableScan::TiDBTableScan(const tipb::Executor * table_scan_, const DAGContext & dag_context)
TiDBTableScan::TiDBTableScan(
const tipb::Executor * table_scan_,
const String & executor_id_,
const DAGContext & dag_context)
: table_scan(table_scan_)
, executor_id(executor_id_)
, is_partition_table_scan(table_scan->tp() == tipb::TypePartitionTableScan)
, columns(is_partition_table_scan ? table_scan->partition_table_scan().columns() : table_scan->tbl_scan().columns())
{
Expand Down
8 changes: 6 additions & 2 deletions dbms/src/Flash/Coprocessor/TiDBTableScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ namespace DB
class TiDBTableScan
{
public:
TiDBTableScan(const tipb::Executor * table_scan_, const DAGContext & dag_context);
TiDBTableScan(
const tipb::Executor * table_scan_,
const String & executor_id_,
const DAGContext & dag_context);
bool isPartitionTableScan() const
{
return is_partition_table_scan;
Expand All @@ -48,11 +51,12 @@ class TiDBTableScan
}
String getTableScanExecutorID() const
{
return table_scan->executor_id();
return executor_id;
}

private:
const tipb::Executor * table_scan;
String executor_id;
bool is_partition_table_scan;
const google::protobuf::RepeatedPtrField<tipb::ColumnInfo> & columns;
/// logical_table_id is the table id for a TiDB' table, while if the
Expand Down