Skip to content

Commit

Permalink
Interpreter: Mock exchange executors. (#4706)
Browse files Browse the repository at this point in the history
ref #4609
  • Loading branch information
ywqzzy authored Apr 21, 2022
1 parent ebb28dc commit 28a97fd
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 39 deletions.
14 changes: 14 additions & 0 deletions dbms/src/Debug/astToExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1525,4 +1525,18 @@ ExecutorPtr compileJoin(size_t & executor_index, ExecutorPtr left, ExecutorPtr r
return join;
}

ExecutorPtr compileExchangeSender(ExecutorPtr input, size_t & executor_index, tipb::ExchangeType exchange_type)
{
ExecutorPtr exchange_sender = std::make_shared<mock::ExchangeSender>(executor_index, input->output_schema, exchange_type);
exchange_sender->children.push_back(input);
return exchange_sender;
}


ExecutorPtr compileExchangeReceiver(size_t & executor_index, DAGSchema schema)
{
ExecutorPtr exchange_receiver = std::make_shared<mock::ExchangeReceiver>(executor_index, schema);
return exchange_receiver;
}

} // namespace DB
4 changes: 4 additions & 0 deletions dbms/src/Debug/astToExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,4 +290,8 @@ ExecutorPtr compileProject(ExecutorPtr input, size_t & executor_index, ASTPtr se

ExecutorPtr compileJoin(size_t & executor_index, ExecutorPtr left, ExecutorPtr right, ASTPtr params);

ExecutorPtr compileExchangeSender(ExecutorPtr input, size_t & executor_index, tipb::ExchangeType exchange_type);

ExecutorPtr compileExchangeReceiver(size_t & executor_index, DAGSchema schema);

} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/TestUtils/InterpreterTestUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ String toTreeString(const tipb::Executor & root_executor, size_t level)

auto append_str = [&buffer, &level](const tipb::Executor & executor) {
assert(executor.has_executor_id());

buffer.append(String(level, ' '));
buffer.append(executor.executor_id()).append("\n");
};
Expand Down
7 changes: 2 additions & 5 deletions dbms/src/TestUtils/InterpreterTestUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ class MockExecutorTest : public ::testing::Test
MockExecutorTest()
: context(TiFlashTestEnv::getContext())
{}

static void SetUpTestCase()
{
try
Expand All @@ -53,8 +52,7 @@ class MockExecutorTest : public ::testing::Test
virtual void initializeContext()
{
dag_context_ptr = std::make_unique<DAGContext>(1024);
context.setDAGContext(dag_context_ptr.get());
mock_dag_request_context = MockDAGRequestContext();
context = MockDAGRequestContext(TiFlashTestEnv::getContext());
}

DAGContext & getDAGContext()
Expand All @@ -64,8 +62,7 @@ class MockExecutorTest : public ::testing::Test
}

protected:
Context context;
MockDAGRequestContext mock_dag_request_context;
MockDAGRequestContext context;
std::unique_ptr<DAGContext> dag_context_ptr;
};

Expand Down
77 changes: 67 additions & 10 deletions dbms/src/TestUtils/mockExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Debug/astToExecutor.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
Expand All @@ -20,6 +21,7 @@
#include <Parsers/ASTOrderByElement.h>
#include <TestUtils/TiFlashTestException.h>
#include <TestUtils/mockExecutor.h>
#include <tipb/executor.pb.h>
namespace DB::tests
{
ASTPtr buildColumn(const String & column_name)
Expand All @@ -34,14 +36,15 @@ ASTPtr buildLiteral(const Field & field)

ASTPtr buildOrderByItemList(MockOrderByItems order_by_items)
{
std::vector<ASTPtr> vec;
std::vector<ASTPtr> vec(order_by_items.size());
size_t i = 0;
for (auto item : order_by_items)
{
int direction = item.second ? 1 : -1;
int direction = item.second ? 1 : -1; // todo
ASTPtr locale_node;
auto order_by_item = std::make_shared<ASTOrderByElement>(direction, direction, false, locale_node);
order_by_item->children.push_back(std::make_shared<ASTIdentifier>(item.first));
vec.push_back(order_by_item);
vec[i++] = order_by_item;
}
auto exp_list = std::make_shared<ASTExpressionList>();
exp_list->children.insert(exp_list->children.end(), vec.begin(), vec.end());
Expand All @@ -67,13 +70,13 @@ void DAGRequestBuilder::initDAGRequest(tipb::DAGRequest & dag_request)
}

// traval the AST tree to build tipb::Executor recursively.
std::shared_ptr<tipb::DAGRequest> DAGRequestBuilder::build(Context & context)
std::shared_ptr<tipb::DAGRequest> DAGRequestBuilder::build(MockDAGRequestContext & mock_context)
{
MPPInfo mpp_info(properties.start_ts, -1, -1, {}, {});
MPPInfo mpp_info(properties.start_ts, -1, -1, {}, mock_context.receiver_source_task_ids_map);
std::shared_ptr<tipb::DAGRequest> dag_request_ptr = std::make_shared<tipb::DAGRequest>();
tipb::DAGRequest & dag_request = *dag_request_ptr;
initDAGRequest(dag_request);
root->toTiPBExecutor(dag_request.mutable_root_executor(), properties.collator, mpp_info, context);
root->toTiPBExecutor(dag_request.mutable_root_executor(), properties.collator, mpp_info, mock_context.context);
root.reset();
executor_index = 0;
return dag_request_ptr;
Expand All @@ -96,7 +99,7 @@ DAGRequestBuilder & DAGRequestBuilder::mockTable(const String & db, const String
return *this;
}

DAGRequestBuilder & DAGRequestBuilder::mockTable(const MockTableName & name, const std::vector<std::pair<String, TiDB::TP>> & columns)
DAGRequestBuilder & DAGRequestBuilder::mockTable(const MockTableName & name, const MockColumnInfos & columns)
{
return mockTable(name.first, name.second, columns);
}
Expand All @@ -106,6 +109,31 @@ DAGRequestBuilder & DAGRequestBuilder::mockTable(const MockTableName & name, con
return mockTable(name.first, name.second, columns);
}

DAGRequestBuilder & DAGRequestBuilder::exchangeReceiver(const MockColumnInfos & columns)
{
return buildExchangeReceiver(columns);
}

DAGRequestBuilder & DAGRequestBuilder::exchangeReceiver(const MockColumnInfoList & columns)
{
return buildExchangeReceiver(columns);
}

DAGRequestBuilder & DAGRequestBuilder::buildExchangeReceiver(const MockColumnInfos & columns)
{
DAGSchema schema;
for (const auto & column : columns)
{
TiDB::ColumnInfo info;
info.tp = column.second;
info.name = column.first;
schema.push_back({column.first, info});
}

root = compileExchangeReceiver(getExecutorIndex(), schema);
return *this;
}

DAGRequestBuilder & DAGRequestBuilder::filter(ASTPtr filter_expr)
{
assert(root);
Expand Down Expand Up @@ -185,6 +213,13 @@ DAGRequestBuilder & DAGRequestBuilder::project(MockColumnNames col_names)
return *this;
}

DAGRequestBuilder & DAGRequestBuilder::exchangeSender(tipb::ExchangeType exchange_type)
{
assert(root);
root = compileExchangeSender(root, getExecutorIndex(), exchange_type);
return *this;
}

DAGRequestBuilder & DAGRequestBuilder::join(const DAGRequestBuilder & right, ASTPtr using_expr_list)
{
return join(right, using_expr_list, ASTTableJoin::Kind::Inner);
Expand Down Expand Up @@ -230,13 +265,13 @@ DAGRequestBuilder & DAGRequestBuilder::buildAggregation(ASTPtr agg_funcs, ASTPtr
return *this;
}


void MockDAGRequestContext::addMockTable(const MockTableName & name, const MockColumnInfoList & columns)
{
std::vector<MockColumnInfo> v_column_info;
std::vector<MockColumnInfo> v_column_info(columns.size());
size_t i = 0;
for (const auto & info : columns)
{
v_column_info.push_back(std::move(info));
v_column_info[i++] = std::move(info);
}
mock_tables[name.first + "." + name.second] = v_column_info;
}
Expand All @@ -251,9 +286,31 @@ void MockDAGRequestContext::addMockTable(const MockTableName & name, const MockC
mock_tables[name.first + "." + name.second] = columns;
}

void MockDAGRequestContext::addExchangeRelationSchema(String name, const MockColumnInfos & columns)
{
exchange_schemas[name] = columns;
}

void MockDAGRequestContext::addExchangeRelationSchema(String name, const MockColumnInfoList & columns)
{
std::vector<MockColumnInfo> v_column_info(columns.size());
size_t i = 0;
for (const auto & info : columns)
{
v_column_info[i++] = std::move(info);
}
exchange_schemas[name] = v_column_info;
}

DAGRequestBuilder MockDAGRequestContext::scan(String db_name, String table_name)
{
return DAGRequestBuilder(index).mockTable({db_name, table_name}, mock_tables[db_name + "." + table_name]);
}

DAGRequestBuilder MockDAGRequestContext::receive(String exchange_name)
{
auto builder = DAGRequestBuilder(index).exchangeReceiver(exchange_schemas[exchange_name]);
receiver_source_task_ids_map[builder.getRoot()->name] = {};
return builder;
}
} // namespace DB::tests
33 changes: 29 additions & 4 deletions dbms/src/TestUtils/mockExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Debug/astToExecutor.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTFunction.h>
#include <tipb/executor.pb.h>

#include <initializer_list>
#include <unordered_map>
Expand All @@ -32,6 +33,7 @@ using MockOrderByItems = std::initializer_list<MockOrderByItem>;
using MockColumnNames = std::initializer_list<String>;
using MockAsts = std::initializer_list<ASTPtr>;

class MockDAGRequestContext;

/** Responsible for Hand write tipb::DAGRequest
* Use this class to mock DAGRequest, then feed the DAGRequest into
Expand All @@ -51,14 +53,23 @@ class DAGRequestBuilder

explicit DAGRequestBuilder(size_t & index)
: executor_index(index)
{}
{
}

ExecutorPtr getRoot()
{
return root;
}

std::shared_ptr<tipb::DAGRequest> build(Context & context);
std::shared_ptr<tipb::DAGRequest> build(MockDAGRequestContext & mock_context);

DAGRequestBuilder & mockTable(const String & db, const String & table, const MockColumnInfos & columns);
DAGRequestBuilder & mockTable(const MockTableName & name, const MockColumnInfos & columns);
DAGRequestBuilder & mockTable(const MockTableName & name, const MockColumnInfoList & columns);

DAGRequestBuilder & exchangeReceiver(const MockColumnInfos & columns);
DAGRequestBuilder & exchangeReceiver(const MockColumnInfoList & columns);

DAGRequestBuilder & filter(ASTPtr filter_expr);

DAGRequestBuilder & limit(int limit);
Expand All @@ -73,6 +84,8 @@ class DAGRequestBuilder
DAGRequestBuilder & project(MockAsts expr);
DAGRequestBuilder & project(MockColumnNames col_names);

DAGRequestBuilder & exchangeSender(tipb::ExchangeType exchange_type);

// Currentlt only support inner join, left join and right join.
// TODO support more types of join.
DAGRequestBuilder & join(const DAGRequestBuilder & right, ASTPtr using_expr_list);
Expand All @@ -85,6 +98,7 @@ class DAGRequestBuilder
private:
void initDAGRequest(tipb::DAGRequest & dag_request);
DAGRequestBuilder & buildAggregation(ASTPtr agg_funcs, ASTPtr group_by_exprs);
DAGRequestBuilder & buildExchangeReceiver(const MockColumnInfos & columns);

ExecutorPtr root;
DAGProperties properties;
Expand All @@ -97,7 +111,8 @@ class DAGRequestBuilder
class MockDAGRequestContext
{
public:
MockDAGRequestContext()
explicit MockDAGRequestContext(Context context_)
: context(context_)
{
index = 0;
}
Expand All @@ -110,12 +125,22 @@ class MockDAGRequestContext
void addMockTable(const MockTableName & name, const MockColumnInfoList & columns);
void addMockTable(const String & db, const String & table, const MockColumnInfos & columns);
void addMockTable(const MockTableName & name, const MockColumnInfos & columns);

void addExchangeRelationSchema(String name, const MockColumnInfos & columns);
void addExchangeRelationSchema(String name, const MockColumnInfoList & columns);
DAGRequestBuilder scan(String db_name, String table_name);
DAGRequestBuilder receive(String exchange_name);

private:
size_t index;
std::unordered_map<String, MockColumnInfos> mock_tables;
std::unordered_map<String, MockColumnInfos> exchange_schemas;

public:
// Currently don't support task_id, so the following to structure is useless,
// but we need it to contruct the TaskMeta.
// In TiFlash, we use task_id to identify an Mpp Task.
std::unordered_map<String, std::vector<Int64>> receiver_source_task_ids_map;
Context context;
};

ASTPtr buildColumn(const String & column_name);
Expand Down
Loading

0 comments on commit 28a97fd

Please sign in to comment.