Skip to content

Commit

Permalink
Planner: support table scan (#5335)
Browse files Browse the repository at this point in the history
ref #4739
  • Loading branch information
SeaRise authored Jul 25, 2022
1 parent aa20f61 commit 42b4055
Show file tree
Hide file tree
Showing 20 changed files with 1,304 additions and 111 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ void DAGQueryBlockInterpreter::handleMockTableScan(const TiDBTableScan & table_s
{
if (context.getDAGContext()->columnsForTestEmpty() || context.getDAGContext()->columnsForTest(table_scan.getTableScanExecutorID()).empty())
{
auto names_and_types = genNamesAndTypes(table_scan);
auto names_and_types = genNamesAndTypes(table_scan, "mock_table_scan");
auto columns_with_type_and_name = getColumnWithTypeAndName(names_and_types);
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(names_and_types), context);
for (size_t i = 0; i < max_streams; ++i)
Expand Down
33 changes: 24 additions & 9 deletions dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,49 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Flash/Coprocessor/GenSchemaAndColumn.h>
#include <Storages/MutableSupport.h>
#include <Storages/Transaction/TiDB.h>
#include <Storages/Transaction/TypeMapping.h>

namespace DB
{
NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan)
namespace
{
DataTypePtr getPkType(const ColumnInfo & column_info)
{
const auto & pk_data_type = getDataTypeByColumnInfoForComputingLayer(column_info);
/// primary key type must be tidb_pk_column_int_type or tidb_pk_column_string_type.
if (unlikely(!pk_data_type->equals(*MutableSupport::tidb_pk_column_int_type) && !pk_data_type->equals(*MutableSupport::tidb_pk_column_string_type)))
throw Exception(
fmt::format(
"Actual pk_data_type {} is not {} or {}",
pk_data_type->getName(),
MutableSupport::tidb_pk_column_int_type->getName(),
MutableSupport::tidb_pk_column_string_type->getName()),
ErrorCodes::LOGICAL_ERROR);
return pk_data_type;
}
} // namespace

NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan, const StringRef & column_prefix)
{
NamesAndTypes names_and_types;
names_and_types.reserve(table_scan.getColumnSize());
for (Int32 i = 0; i < table_scan.getColumnSize(); ++i)
{
TiDB::ColumnInfo column_info;
const auto & ci = table_scan.getColumns()[i];
column_info.tp = static_cast<TiDB::TP>(ci.tp());
column_info.id = ci.column_id();

auto column_info = TiDB::toTiDBColumnInfo(table_scan.getColumns()[i]);
switch (column_info.id)
{
case TiDBPkColumnID:
// TODO: need to check if the type of pk_handle_columns matches the type that used in delta merge tree.
names_and_types.emplace_back(MutableSupport::tidb_pk_column_name, getDataTypeByColumnInfoForComputingLayer(column_info));
names_and_types.emplace_back(MutableSupport::tidb_pk_column_name, getPkType(column_info));
break;
case ExtraTableIDColumnID:
names_and_types.emplace_back(MutableSupport::extra_table_id_column_name, MutableSupport::extra_table_id_column_type);
break;
default:
names_and_types.emplace_back(fmt::format("mock_table_scan_{}", i), getDataTypeByColumnInfoForComputingLayer(column_info));
names_and_types.emplace_back(fmt::format("{}_{}", column_prefix, i), getDataTypeByColumnInfoForComputingLayer(column_info));
}
}
return names_and_types;
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Coprocessor/GenSchemaAndColumn.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
#include <Flash/Coprocessor/ChunkCodec.h>
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Storages/Transaction/TiDB.h>
#include <common/StringRef.h>

namespace DB
{
NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan);
NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan, const StringRef & column_prefix);
ColumnsWithTypeAndName getColumnWithTypeAndName(const NamesAndTypes & names_and_types);
NamesAndTypes toNamesAndTypes(const DAGSchema & dag_schema);
} // namespace DB
9 changes: 7 additions & 2 deletions dbms/src/Flash/Coprocessor/PushDownFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,15 @@ PushDownFilter PushDownFilter::toPushDownFilter(const String & executor_id, cons
return {"", {}};
}

return toPushDownFilter(executor_id, executor->selection());
}

PushDownFilter PushDownFilter::toPushDownFilter(const String & executor_id, const tipb::Selection & selection)
{
std::vector<const tipb::Expr *> conditions;
for (const auto & condition : executor->selection().conditions())
for (const auto & condition : selection.conditions())
conditions.push_back(&condition);

return {executor_id, conditions};
}
} // namespace DB
} // namespace DB
10 changes: 7 additions & 3 deletions dbms/src/Flash/Coprocessor/PushDownFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ struct PushDownFilter
{
static PushDownFilter toPushDownFilter(const String & executor_id, const tipb::Executor * executor);

static PushDownFilter toPushDownFilter(const String & executor_id, const tipb::Selection & selection);

PushDownFilter() = default;

PushDownFilter(
const String & executor_id_,
const std::vector<const tipb::Expr *> & conditions_);
Expand All @@ -33,7 +37,7 @@ struct PushDownFilter

tipb::Executor * constructSelectionForRemoteRead(tipb::Executor * mutable_executor) const;

String executor_id;
std::vector<const tipb::Expr *> conditions;
String executor_id{};
std::vector<const tipb::Expr *> conditions{};
};
} // namespace DB
} // namespace DB
38 changes: 37 additions & 1 deletion dbms/src/Flash/Planner/PhysicalPlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,33 @@
#include <Flash/Planner/plans/PhysicalLimit.h>
#include <Flash/Planner/plans/PhysicalMockExchangeReceiver.h>
#include <Flash/Planner/plans/PhysicalMockExchangeSender.h>
#include <Flash/Planner/plans/PhysicalMockTableScan.h>
#include <Flash/Planner/plans/PhysicalProjection.h>
#include <Flash/Planner/plans/PhysicalSource.h>
#include <Flash/Planner/plans/PhysicalTableScan.h>
#include <Flash/Planner/plans/PhysicalTopN.h>
#include <Flash/Statistics/traverseExecutors.h>
#include <Interpreters/Context.h>

namespace DB
{
namespace
{
bool pushDownSelection(const PhysicalPlanNodePtr & plan, const String & executor_id, const tipb::Selection & selection)
{
if (plan->tp() == PlanType::TableScan)
{
auto physical_table_scan = std::static_pointer_cast<PhysicalTableScan>(plan);
if (!physical_table_scan->hasPushDownFilter())
{
physical_table_scan->pushDownFilter(executor_id, selection);
return true;
}
}
return false;
}
} // namespace

void PhysicalPlan::build(const tipb::DAGRequest * dag_request)
{
assert(dag_request);
Expand All @@ -56,8 +75,14 @@ void PhysicalPlan::build(const String & executor_id, const tipb::Executor * exec
pushBack(PhysicalTopN::build(context, executor_id, log, executor->topn(), popBack()));
break;
case tipb::ExecType::TypeSelection:
pushBack(PhysicalFilter::build(context, executor_id, log, executor->selection(), popBack()));
{
auto child = popBack();
if (pushDownSelection(child, executor_id, executor->selection()))
pushBack(child);
else
pushBack(PhysicalFilter::build(context, executor_id, log, executor->selection(), child));
break;
}
case tipb::ExecType::TypeAggregation:
case tipb::ExecType::TypeStreamAgg:
pushBack(PhysicalAggregation::build(context, executor_id, log, executor->aggregation(), popBack()));
Expand All @@ -81,6 +106,17 @@ void PhysicalPlan::build(const String & executor_id, const tipb::Executor * exec
case tipb::ExecType::TypeProjection:
pushBack(PhysicalProjection::build(context, executor_id, log, executor->projection(), popBack()));
break;
case tipb::ExecType::TypeTableScan:
case tipb::ExecType::TypePartitionTableScan:
{
TiDBTableScan table_scan(executor, executor_id, dagContext());
if (unlikely(dagContext().isTest()))
pushBack(PhysicalMockTableScan::build(context, executor_id, log, table_scan));
else
pushBack(PhysicalTableScan::build(executor_id, log, table_scan));
dagContext().table_scan_executor_id = executor_id;
break;
}
default:
throw TiFlashException(fmt::format("{} executor is not supported", executor->tp()), Errors::Planner::Unimplemented);
}
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Flash/Planner/PlanType.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ String PlanType::toString() const
return "MockExchangeReceiver";
case Projection:
return "Projection";
case TableScan:
return "TableScan";
case MockTableScan:
return "MockTableScan";
default:
throw TiFlashException("Unknown PlanType", Errors::Planner::Internal);
}
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Planner/PlanType.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ struct PlanType
ExchangeReceiver = 7,
MockExchangeReceiver = 8,
Projection = 9,
TableScan = 10,
MockTableScan = 11,
};
PlanTypeEnum enum_value;

Expand Down
19 changes: 10 additions & 9 deletions dbms/src/Flash/Planner/Planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@ namespace DB
{
namespace
{
void analyzePhysicalPlan(Context & context, PhysicalPlan & physical_plan, const DAGQueryBlock & query_block)
void analyzePhysicalPlan(PhysicalPlan & physical_plan, const DAGQueryBlock & query_block)
{
assert(query_block.source);
physical_plan.build(query_block.source_name, query_block.source);

// selection on table scan had been executed in table scan.
// In test mode, filter is not pushed down to table scan.
if (query_block.selection && (!query_block.isTableScanSource() || context.getDAGContext()->isTest()))
if (query_block.selection)
{
physical_plan.build(query_block.selection_name, query_block.selection);
}
Expand Down Expand Up @@ -92,10 +90,13 @@ bool Planner::isSupported(const DAGQueryBlock & query_block)
return !enableFineGrainedShuffle(query_block.source->fine_grained_shuffle_stream_count())
&& (!query_block.exchange_sender || !enableFineGrainedShuffle(query_block.exchange_sender->fine_grained_shuffle_stream_count()));
};
return query_block.source
&& (query_block.source->tp() == tipb::ExecType::TypeProjection
|| query_block.source->tp() == tipb::ExecType::TypeExchangeReceiver)
&& disable_fine_frained_shuffle(query_block);
static auto has_supported_source = [](const DAGQueryBlock & query_block) {
return query_block.source
&& (query_block.source->tp() == tipb::ExecType::TypeProjection
|| query_block.source->tp() == tipb::ExecType::TypeExchangeReceiver
|| query_block.isTableScanSource());
};
return has_supported_source(query_block) && disable_fine_frained_shuffle(query_block);
}

DAGContext & Planner::dagContext() const
Expand All @@ -118,7 +119,7 @@ void Planner::executeImpl(DAGPipeline & pipeline)
physical_plan.buildSource(input_streams);
}

analyzePhysicalPlan(context, physical_plan, query_block);
analyzePhysicalPlan(physical_plan, query_block);

physical_plan.outputAndOptimize();

Expand Down
46 changes: 34 additions & 12 deletions dbms/src/Flash/Planner/plans/PhysicalMockExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,24 @@

namespace DB
{
PhysicalMockExchangeReceiver::PhysicalMockExchangeReceiver(
const String & executor_id_,
const NamesAndTypes & schema_,
const String & req_id,
const Block & sample_block_,
const BlockInputStreams & mock_streams_)
: PhysicalLeaf(executor_id_, PlanType::MockExchangeReceiver, schema_, req_id)
, sample_block(sample_block_)
, mock_streams(mock_streams_)
{}

PhysicalPlanNodePtr PhysicalMockExchangeReceiver::build(
namespace
{
std::pair<NamesAndTypes, BlockInputStreams> mockSchemaAndStreams(
Context & context,
const String & executor_id,
const LoggerPtr & log,
const tipb::ExchangeReceiver & exchange_receiver)
{
NamesAndTypes schema;
BlockInputStreams mock_streams;

auto & dag_context = *context.getDAGContext();
size_t max_streams = dag_context.initialize_concurrency;
assert(max_streams > 0);

if (dag_context.columnsForTestEmpty() || dag_context.columnsForTest(executor_id).empty())
{
/// build with default blocks.
for (size_t i = 0; i < max_streams; ++i)
// use max_block_size / 10 to determine the mock block's size
mock_streams.push_back(std::make_shared<MockExchangeReceiverInputStream>(exchange_receiver, context.getSettingsRef().max_block_size, context.getSettingsRef().max_block_size / 10));
Expand All @@ -53,13 +48,40 @@ PhysicalPlanNodePtr PhysicalMockExchangeReceiver::build(
}
else
{
/// build from user input blocks.
auto [names_and_types, mock_exchange_streams] = mockSourceStream<MockExchangeReceiverInputStream>(context, max_streams, log, executor_id);
schema = std::move(names_and_types);
mock_streams.insert(mock_streams.end(), mock_exchange_streams.begin(), mock_exchange_streams.end());
}

assert(!schema.empty());
assert(!mock_streams.empty());

return {std::move(schema), std::move(mock_streams)};
}
} // namespace

PhysicalMockExchangeReceiver::PhysicalMockExchangeReceiver(
const String & executor_id_,
const NamesAndTypes & schema_,
const String & req_id,
const Block & sample_block_,
const BlockInputStreams & mock_streams_)
: PhysicalLeaf(executor_id_, PlanType::MockExchangeReceiver, schema_, req_id)
, sample_block(sample_block_)
, mock_streams(mock_streams_)
{}

PhysicalPlanNodePtr PhysicalMockExchangeReceiver::build(
Context & context,
const String & executor_id,
const LoggerPtr & log,
const tipb::ExchangeReceiver & exchange_receiver)
{
assert(context.getDAGContext()->isTest());

auto [schema, mock_streams] = mockSchemaAndStreams(context, executor_id, log, exchange_receiver);

auto physical_mock_exchange_receiver = std::make_shared<PhysicalMockExchangeReceiver>(
executor_id,
schema,
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Flash/Planner/plans/PhysicalMockExchangeReceiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@

namespace DB
{
/**
* A physical plan node that generates MockExchangeReceiverInputStream.
* Used in gtest to test execution logic.
* Only available with `DAGContext.isTest() == true`.
*/
class PhysicalMockExchangeReceiver : public PhysicalLeaf
{
public:
Expand All @@ -43,6 +48,7 @@ class PhysicalMockExchangeReceiver : public PhysicalLeaf
private:
void transformImpl(DAGPipeline & pipeline, Context & /*context*/, size_t /*max_streams*/) override;

private:
Block sample_block;

BlockInputStreams mock_streams;
Expand Down
Loading

0 comments on commit 42b4055

Please sign in to comment.