From 04808772ca8fae9efb89a45b4fddda7eff52f180 Mon Sep 17 00:00:00 2001 From: Rui Mo Date: Thu, 7 Jul 2022 13:25:50 +0800 Subject: [PATCH] Removed special handling for avg (#31) --- velox/substrait/SubstraitToVeloxExpr.cpp | 38 +++- velox/substrait/SubstraitToVeloxExpr.h | 5 + velox/substrait/SubstraitToVeloxPlan.cpp | 240 +++-------------------- velox/substrait/SubstraitToVeloxPlan.h | 13 +- 4 files changed, 70 insertions(+), 226 deletions(-) diff --git a/velox/substrait/SubstraitToVeloxExpr.cpp b/velox/substrait/SubstraitToVeloxExpr.cpp index bad3e09ecc44..a6753c93a202 100644 --- a/velox/substrait/SubstraitToVeloxExpr.cpp +++ b/velox/substrait/SubstraitToVeloxExpr.cpp @@ -103,6 +103,30 @@ SubstraitVeloxExprConverter::toExtractExpr( VELOX_FAIL("Constant is expected to be the first parameter in extract."); } +std::shared_ptr +SubstraitVeloxExprConverter::toRowConstructorExpr( + const std::vector>& params, + const std::string& typeName) { + std::vector structTypeNames; + subParser_->getSubFunctionTypes(typeName, structTypeNames); + VELOX_CHECK( + structTypeNames.size() > 0, "At lease one type name is expected."); + + // Preparation for the conversion from struct types to RowType. + std::vector rowTypes; + std::vector names; + for (int idx = 0; idx < structTypeNames.size(); idx++) { + std::string substraitTypeName = structTypeNames[idx]; + names.emplace_back("col_" + std::to_string(idx)); + rowTypes.emplace_back(std::move(toVeloxType(substraitTypeName))); + } + + return std::make_shared( + ROW(std::move(names), std::move(rowTypes)), + std::move(params), + "row_constructor"); +} + std::shared_ptr SubstraitVeloxExprConverter::toVeloxExpr( const ::substrait::Expression::ScalarFunction& sFunc, @@ -114,21 +138,23 @@ SubstraitVeloxExprConverter::toVeloxExpr( } const auto& veloxFunction = subParser_->findVeloxFunction(functionMap_, sFunc.function_reference()); - const auto& veloxType = - toVeloxType(subParser_->parseType(sFunc.output_type())->type); + std::string typeName = subParser_->parseType(sFunc.output_type())->type; if (veloxFunction == "extract") { - return toExtractExpr(params, veloxType); + return toExtractExpr(std::move(params), toVeloxType(typeName)); } if (veloxFunction == "alias") { - return toAliasExpr(params); + return toAliasExpr(std::move(params)); } if (veloxFunction == "is_not_null") { - return toIsNotNullExpr(params, veloxType); + return toIsNotNullExpr(std::move(params), toVeloxType(typeName)); + } + if (veloxFunction == "row_constructor") { + return toRowConstructorExpr(std::move(params), typeName); } return std::make_shared( - veloxType, std::move(params), veloxFunction); + toVeloxType(typeName), std::move(params), veloxFunction); } std::shared_ptr diff --git a/velox/substrait/SubstraitToVeloxExpr.h b/velox/substrait/SubstraitToVeloxExpr.h index 8077fd23369a..0e264eedca8a 100644 --- a/velox/substrait/SubstraitToVeloxExpr.h +++ b/velox/substrait/SubstraitToVeloxExpr.h @@ -73,6 +73,11 @@ class SubstraitVeloxExprConverter { const std::vector>& params, const TypePtr& outputType); + /// Create expression for row_constructor. + std::shared_ptr toRowConstructorExpr( + const std::vector>& params, + const std::string& typeName); + /// Used to convert Substrait Literal into Velox Expression. std::shared_ptr toVeloxExpr( const ::substrait::Expression::Literal& substraitLit); diff --git a/velox/substrait/SubstraitToVeloxPlan.cpp b/velox/substrait/SubstraitToVeloxPlan.cpp index aafc2b998eb2..3ca64662ab03 100644 --- a/velox/substrait/SubstraitToVeloxPlan.cpp +++ b/velox/substrait/SubstraitToVeloxPlan.cpp @@ -192,175 +192,15 @@ std::shared_ptr SubstraitVeloxPlanConverter::toVeloxPlan( std::shared_ptr childNode; if (sAgg.has_input()) { childNode = toVeloxPlan(sAgg.input()); - } else { VELOX_FAIL("Child Rel is expected in AggregateRel."); } + core::AggregationNode::Step aggStep; - // Get aggregation phase and check if there are input columns need to be - // combined into row. - if (needsRowConstruct(sAgg, aggStep)) { - return toVeloxAggWithRowConstruct(sAgg, childNode, aggStep); - } + setPhase(sAgg, aggStep); return toVeloxAgg(sAgg, childNode, aggStep); } -std::shared_ptr -SubstraitVeloxPlanConverter::toVeloxAggWithRowConstruct( - const ::substrait::AggregateRel& sAgg, - const std::shared_ptr& childNode, - const core::AggregationNode::Step& aggStep) { - // Will add a Project node before Aggregate node to combine columns into - // row. - std::vector> constructExprs; - const auto& groupings = sAgg.groupings(); - const auto& constructInputType = childNode->outputType(); - - // Handle groupings. - uint32_t groupingOutIdx = 0; - for (const auto& grouping : groupings) { - const auto& groupingExprs = grouping.grouping_expressions(); - for (const auto& groupingExpr : groupingExprs) { - // Velox's groupings are limited to be Field. - auto fieldExpr = exprConverter_->toVeloxExpr( - groupingExpr.selection(), constructInputType); - constructExprs.push_back(fieldExpr); - groupingOutIdx += 1; - } - } - - std::vector aggExprs; - auto aggMeasureSize = sAgg.measures().size(); - aggExprs.reserve(aggMeasureSize); - - std::vector aggregateMasks; - aggregateMasks.reserve(sAgg.measures().size()); - - // Construct Velox Aggregate expressions. - for (const auto& measure : sAgg.measures()) { - core::FieldAccessTypedExprPtr aggregateMask; - ::substrait::Expression substraitAggMask = measure.filter(); - // Get Aggregation Masks. - if (measure.has_filter()) { - if (substraitAggMask.ByteSizeLong() == 0) { - aggregateMask = {}; - } else { - aggregateMask = - std::dynamic_pointer_cast( - exprConverter_->toVeloxExpr( - substraitAggMask, constructInputType)); - } - aggregateMasks.push_back(aggregateMask); - } - } - // Handle aggregations. - std::vector aggFuncNames; - aggFuncNames.reserve(sAgg.measures().size()); - std::vector aggOutTypes; - aggOutTypes.reserve(sAgg.measures().size()); - - for (const auto& smea : sAgg.measures()) { - const auto& aggFunction = smea.measure(); - std::string funcName = subParser_->findVeloxFunction( - functionMap_, aggFunction.function_reference()); - aggFuncNames.emplace_back(funcName); - aggOutTypes.emplace_back( - toVeloxType(subParser_->parseType(aggFunction.output_type())->type)); - if (funcName == "avg") { - // Will use row constructor to combine the sum and count columns into - // row. - if (aggFunction.args().size() != 2) { - VELOX_FAIL("Final average should have two args."); - } - std::vector> aggParams; - aggParams.reserve(aggFunction.args().size()); - for (const auto& arg : aggFunction.args()) { - aggParams.emplace_back( - exprConverter_->toVeloxExpr(arg, constructInputType)); - } - auto constructExpr = std::make_shared( - ROW({"sum", "count"}, {DOUBLE(), BIGINT()}), - std::move(aggParams), - "row_constructor"); - constructExprs.emplace_back(constructExpr); - } else { - if (aggFunction.args().size() != 1) { - VELOX_FAIL("Expect only one arg."); - } - for (const auto& arg : aggFunction.args()) { - constructExprs.emplace_back( - exprConverter_->toVeloxExpr(arg, constructInputType)); - } - } - } - // Get the output names of row construct. - std::vector constructOutNames; - constructOutNames.reserve(constructExprs.size()); - for (uint32_t colIdx = 0; colIdx < constructExprs.size(); colIdx++) { - constructOutNames.emplace_back( - subParser_->makeNodeName(planNodeId_, colIdx)); - } - - uint32_t totalOutColNum = constructExprs.size(); - // Create the row construct node. - auto constructNode = std::make_shared( - nextPlanNodeId(), - std::move(constructOutNames), - std::move(constructExprs), - childNode); - - // Create the Aggregation node. - bool ignoreNullKeys = false; - std::vector> - preGroupingExprs = {}; - - // Get the output names of Aggregate node. - std::vector aggOutNames; - aggOutNames.reserve(totalOutColNum - groupingOutIdx); - for (uint32_t idx = groupingOutIdx; idx < totalOutColNum; idx++) { - aggOutNames.emplace_back(subParser_->makeNodeName(planNodeId_, idx)); - } - - const auto& constructOutType = constructNode->outputType(); - for (uint32_t colIdx = groupingOutIdx; colIdx < totalOutColNum; colIdx++) { - std::vector> aggArgs; - aggArgs.reserve(1); - // Use the colIdx to access the columns after grouping columns. - aggArgs.emplace_back(std::make_shared( - constructOutType->childAt(colIdx), constructOutType->names()[colIdx])); - // Use the another index to access the types and names of aggregation - // columns. - aggExprs.emplace_back(std::make_shared( - aggOutTypes[colIdx - groupingOutIdx], - std::move(aggArgs), - aggFuncNames[colIdx - groupingOutIdx])); - } - - // Get the grouping expressions. - std::vector> groupingExprs; - groupingExprs.reserve(groupingOutIdx); - for (uint32_t colIdx = 0; colIdx < groupingOutIdx; colIdx++) { - // Velox's groupings are limited to be Field. - groupingExprs.emplace_back( - std::make_shared( - constructOutType->childAt(colIdx), - constructOutType->names()[colIdx])); - } - - // Create the Aggregation node. - auto aggNode = std::make_shared( - nextPlanNodeId(), - aggStep, - groupingExprs, - preGroupingExprs, - aggOutNames, - aggExprs, - aggregateMasks, - ignoreNullKeys, - constructNode); - return aggNode; -} - std::shared_ptr SubstraitVeloxPlanConverter::toVeloxAgg( const ::substrait::AggregateRel& sAgg, const std::shared_ptr& childNode, @@ -381,7 +221,7 @@ std::shared_ptr SubstraitVeloxPlanConverter::toVeloxAgg( } // Parse measures and get the aggregate expressions. - uint32_t aggOutIdx = groupingOutIdx; + // Each measure represents one aggregate expression. std::vector> aggExprs; aggExprs.reserve(sAgg.measures().size()); for (const auto& smea : sAgg.measures()) { @@ -395,33 +235,22 @@ std::shared_ptr SubstraitVeloxPlanConverter::toVeloxAgg( } auto aggVeloxType = toVeloxType(subParser_->parseType(aggFunction.output_type())->type); - if (funcName == "avg") { - // Will used sum and count to calculate the partial avg. - auto sumExpr = std::make_shared( - aggVeloxType, aggParams, "sum"); - auto countExpr = std::make_shared( - BIGINT(), aggParams, "count"); - aggExprs.emplace_back(sumExpr); - aggExprs.emplace_back(countExpr); - aggOutIdx += 2; - } else { - auto aggExpr = std::make_shared( - aggVeloxType, std::move(aggParams), funcName); - aggExprs.emplace_back(aggExpr); - aggOutIdx += 1; - } + auto aggExpr = std::make_shared( + aggVeloxType, std::move(aggParams), funcName); + aggExprs.emplace_back(aggExpr); } bool ignoreNullKeys = false; std::vector> aggregateMasks( - aggOutIdx - groupingOutIdx); + sAgg.measures().size()); std::vector> preGroupingExprs = {}; // Get the output names of Aggregation. std::vector aggOutNames; - aggOutNames.reserve(aggOutIdx - groupingOutIdx); - for (int idx = groupingOutIdx; idx < aggOutIdx; idx++) { + aggOutNames.reserve(sAgg.measures().size()); + for (int idx = groupingOutIdx; idx < groupingOutIdx + sAgg.measures().size(); + idx++) { aggOutNames.emplace_back(subParser_->makeNodeName(planNodeId_, idx)); } @@ -819,41 +648,34 @@ std::string SubstraitVeloxPlanConverter::findFuncSpec(uint64_t id) { return subParser_->findSubstraitFuncSpec(functionMap_, id); } -bool SubstraitVeloxPlanConverter::needsRowConstruct( +void SubstraitVeloxPlanConverter::setPhase( const ::substrait::AggregateRel& sAgg, core::AggregationNode::Step& aggStep) { if (sAgg.measures().size() == 0) { // When only groupings exist, set the phase to be Single. aggStep = core::AggregationNode::Step::kSingle; - return false; + return; } - for (const auto& smea : sAgg.measures()) { - auto aggFunction = smea.measure(); - std::string funcName = subParser_->findVeloxFunction( - functionMap_, aggFunction.function_reference()); - // Set the aggregation phase. - switch (aggFunction.phase()) { - case ::substrait::AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE: - aggStep = core::AggregationNode::Step::kPartial; - break; - case ::substrait::AGGREGATION_PHASE_INTERMEDIATE_TO_INTERMEDIATE: - aggStep = core::AggregationNode::Step::kIntermediate; - break; - case ::substrait::AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT: - aggStep = core::AggregationNode::Step::kFinal; - // Only Final Average needs row construct currently. - if (funcName == "avg") { - return true; - } - break; - case ::substrait::AGGREGATION_PHASE_INITIAL_TO_RESULT: - aggStep = core::AggregationNode::Step::kSingle; - break; - default: - throw std::runtime_error("Aggregate phase is not supported."); - } + + // Use the first measure to set aggregation phase. + const auto& smea = sAgg.measures()[0]; + const auto& aggFunction = smea.measure(); + switch (aggFunction.phase()) { + case ::substrait::AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE: + aggStep = core::AggregationNode::Step::kPartial; + break; + case ::substrait::AGGREGATION_PHASE_INTERMEDIATE_TO_INTERMEDIATE: + aggStep = core::AggregationNode::Step::kIntermediate; + break; + case ::substrait::AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT: + aggStep = core::AggregationNode::Step::kFinal; + break; + case ::substrait::AGGREGATION_PHASE_INITIAL_TO_RESULT: + aggStep = core::AggregationNode::Step::kSingle; + break; + default: + VELOX_FAIL("Aggregate phase is not supported."); } - return false; } core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan( diff --git a/velox/substrait/SubstraitToVeloxPlan.h b/velox/substrait/SubstraitToVeloxPlan.h index 1efc3fac9797..35c06ec107ea 100644 --- a/velox/substrait/SubstraitToVeloxPlan.h +++ b/velox/substrait/SubstraitToVeloxPlan.h @@ -335,20 +335,11 @@ class SubstraitVeloxPlanConverter { const std::vector<::substrait::Expression_ScalarFunction>& remainingFunctions); - /// Used to check if some of the input columns of Aggregation - /// should be combined into a single column. Currently, this case occurs in - /// final Average. The phase of Aggregation will also be set. - bool needsRowConstruct( + /// Set the phase of Aggregation. + void setPhase( const ::substrait::AggregateRel& sAgg, core::AggregationNode::Step& aggStep); - /// Used to convert AggregateRel into Velox plan node. - /// This method will add a Project node before Aggregation to combine columns. - std::shared_ptr toVeloxAggWithRowConstruct( - const ::substrait::AggregateRel& sAgg, - const std::shared_ptr& childNode, - const core::AggregationNode::Step& aggStep); - /// Used to convert AggregateRel into Velox plan node. /// The output of child node will be used as the input of Aggregation. std::shared_ptr toVeloxAgg(