Skip to content

Commit

Permalink
put the split info into struct
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf authored and zhejiangxiaomai committed Dec 14, 2022
1 parent bc8e8b7 commit 4d6848c
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ void registerSimpleFunctions() {
registerFunction<YearFunction, int32_t, Timestamp>({"year"});
registerFunction<YearFunction, int32_t, Date>({"year"});
registerFunction<YearFunction, int32_t, TimestampWithTimezone>({"year"});
registerFunction<YearFunction, int64_t, Timestamp>({"year"});
registerFunction<YearFunction, int64_t, Date>({"year"});
registerFunction<YearFunction, int64_t, TimestampWithTimezone>({"year"});
// registerFunction<YearFunction, int64_t, Timestamp>({"year"});
// registerFunction<YearFunction, int64_t, Date>({"year"});
// registerFunction<YearFunction, int64_t, TimestampWithTimezone>({"year"});
registerFunction<QuarterFunction, int64_t, Timestamp>({"quarter"});
registerFunction<QuarterFunction, int64_t, Date>({"quarter"});
registerFunction<QuarterFunction, int64_t, TimestampWithTimezone>(
Expand Down
5 changes: 2 additions & 3 deletions velox/substrait/SubstraitToVeloxPlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,6 @@ core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan(

core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan(
const ::substrait::ReadRel& readRel,
memory::MemoryPool* pool,
const RowTypePtr& type) {
::substrait::ReadRel_VirtualTable readVirtualTable = readRel.virtual_table();
int64_t numVectors = readVirtualTable.values_size();
Expand Down Expand Up @@ -501,11 +500,11 @@ core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan(
}
}
children.emplace_back(
setVectorFromVariants(outputChildType, batchChild, pool));
setVectorFromVariants(outputChildType, batchChild, pool_));
}

vectors.emplace_back(
std::make_shared<RowVector>(pool, type, nullptr, batchSize, children));
std::make_shared<RowVector>(pool_, type, nullptr, batchSize, children));
}
return std::make_shared<core::ValuesNode>(nextPlanNodeId(), vectors);
}
Expand Down
13 changes: 1 addition & 12 deletions velox/substrait/SubstraitToVeloxPlan.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,7 @@ class SubstraitVeloxPlanConverter {
/// Starts: the start positions in byte to read from the items.
/// Lengths: the lengths in byte to read from the items.
std::shared_ptr<const core::PlanNode> toVeloxPlan(
const ::substrait::ReadRel& sRead,
u_int32_t& index,
std::vector<std::string>& paths,
std::vector<u_int64_t>& starts,
std::vector<u_int64_t>& lengths);
const ::substrait::ReadRel& sRead);

/// Used to convert Substrait Rel into Velox PlanNode.
std::shared_ptr<const core::PlanNode> toVeloxPlan(
Expand Down Expand Up @@ -122,13 +118,6 @@ class SubstraitVeloxPlanConverter {
return splitInfoMap_;
}

/// Looks up a function by ID and returns function name if found. Throws if
/// function with specified ID doesn't exist. Returns a compound
/// function specification consisting of the function name and the input
/// types. The format is as follows: <function
/// name>:<arg_type0>_<arg_type1>_..._<arg_typeN>
const std::string& findFunction(uint64_t id) const;

/// Used to insert certain plan node as input. The plan node
/// id will start from the setted one.
void insertInputNode(
Expand Down
7 changes: 1 addition & 6 deletions velox/substrait/SubstraitToVeloxPlanValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,12 +299,7 @@ bool SubstraitToVeloxPlanValidator::validate(
bool SubstraitToVeloxPlanValidator::validate(
const ::substrait::ReadRel& sRead) {
try {
u_int32_t index;
std::vector<std::string> paths;
std::vector<u_int64_t> starts;
std::vector<u_int64_t> lengths;

planConverter_->toVeloxPlan(sRead, index, paths, starts, lengths);
planConverter_->toVeloxPlan(sRead);
} catch (const VeloxException& err) {
std::cout << "ReadRel validation failed due to:" << err.message()
<< std::endl;
Expand Down
34 changes: 26 additions & 8 deletions velox/substrait/tests/PlanConversionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
#include "velox/exec/tests/utils/HiveConnectorTestBase.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"
#include "velox/functions/prestosql/aggregates/AverageAggregate.h"
#include "velox/functions/prestosql/aggregates/CountAggregate.h"
#include "velox/functions/sparksql/Register.h"
#include "velox/substrait/SubstraitToVeloxPlan.h"
#include "velox/type/Type.h"
#include "velox/type/tests/FilterBuilder.h"
Expand All @@ -53,12 +56,14 @@ class PlanConversionTest : public virtual HiveConnectorTestBase,
u_int32_t index,
const std::vector<std::string>& paths,
const std::vector<u_int64_t>& starts,
const std::vector<u_int64_t>& lengths)
const std::vector<u_int64_t>& lengths,
const dwio::common::FileFormat& format)
: planNode_(planNode),
index_(index),
paths_(paths),
starts_(starts),
lengths_(lengths) {
lengths_(lengths),
format_(format) {
// Construct the splits.
std::vector<std::shared_ptr<facebook::velox::connector::ConnectorSplit>>
connectorSplits;
Expand All @@ -71,7 +76,7 @@ class PlanConversionTest : public virtual HiveConnectorTestBase,
facebook::velox::connector::hive::HiveConnectorSplit>(
facebook::velox::exec::test::kHiveConnectorId,
path,
facebook::velox::dwio::common::FileFormat::ORC,
format,
start,
length);
connectorSplits.emplace_back(split);
Expand Down Expand Up @@ -130,6 +135,7 @@ class PlanConversionTest : public virtual HiveConnectorTestBase,
std::vector<std::string> paths_;
std::vector<u_int64_t> starts_;
std::vector<u_int64_t> lengths_;
dwio::common::FileFormat format_;
uint64_t numRows_ = 0;
bool mayHasNext_ = true;
RowVectorPtr result_;
Expand All @@ -154,9 +160,16 @@ class PlanConversionTest : public virtual HiveConnectorTestBase,
// Convert to Velox PlanNode.
auto planNode = planConverter->toVeloxPlan(subPlan);

auto splitInfos = planConverter->splitInfos();
auto leafPlanNodeIds = planNode->leafPlanNodeIds();
// Here only one leaf node is expected here.
EXPECT_EQ(1, leafPlanNodeIds.size());
auto iter = leafPlanNodeIds.begin();
auto splitInfo = splitInfos[*iter].get();

// Get the information for TableScan.
u_int32_t partitionIndex = planConverter->getPartitionIndex();
std::vector<std::string> paths = planConverter->getPaths();
u_int32_t partitionIndex = splitInfo->partitionIndex;
std::vector<std::string> paths = splitInfo->paths;

// In test, need to get the absolute path of the generated ORC file.
auto tempPath = getTmpDirPath();
Expand All @@ -167,11 +180,12 @@ class PlanConversionTest : public virtual HiveConnectorTestBase,
absolutePaths.emplace_back(fmt::format("file://{}{}", tempPath, path));
}

std::vector<u_int64_t> starts = planConverter->getStarts();
std::vector<u_int64_t> lengths = planConverter->getLengths();
std::vector<u_int64_t> starts = splitInfo->starts;
std::vector<u_int64_t> lengths = splitInfo->lengths;
auto format = splitInfo->format;
// Construct the result iterator.
auto resIter = std::make_shared<WholeComputeResultIterator>(
planNode, partitionIndex, absolutePaths, starts, lengths);
planNode, partitionIndex, absolutePaths, starts, lengths, format);
return resIter;
}

Expand All @@ -190,6 +204,10 @@ class PlanConversionTest : public virtual HiveConnectorTestBase,
void SetUp() override {
useAsyncCache_ = GetParam();
HiveConnectorTestBase::SetUp();

aggregate::registerSumAggregate<aggregate::SumAggregate>("sum");
aggregate::registerAverageAggregate("avg");
aggregate::registerCountAggregate("count");
}

static void SetUpTestCase() {
Expand Down

0 comments on commit 4d6848c

Please sign in to comment.