diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala index 4b64ae357b9..1b769c3496b 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala @@ -657,6 +657,35 @@ private object SelectSelectNoChildDelta extends MVMatchPattern with PredicateHel } } + def getSubsumeeOutputList(subsumeeOutputList: Seq[NamedExpression], + subsumerOutputList: Seq[NamedExpression]): Seq[NamedExpression] = { + // Map the subsumee output attribute with subsumer attribute and + // return the final projection list. For example, if mv is created with alias column, c1 as e1 + // and the actual user query is just c1, then we have to return c1 as output. + var projOutputList: Seq[NamedExpression] = Seq.empty + subsumeeOutputList.foreach { + case attrRef: AttributeReference => + // get the mapped subsumer output, in the above example case, it will return c1 as e1 + var output = subsumerOutputList.find { + case Alias(attr: AttributeReference, _) => + attrRef.semanticEquals(attr) + case attr: AttributeReference => + attrRef.semanticEquals(attr) + case exp: Expression => + exp.semanticEquals(attrRef) + }.getOrElse(attrRef) + // replace alias with subsumee output, example case, it will be c1 as c1 + output = output match { + case Alias(attr: AttributeReference, _) => + Alias(attrRef, attr.name)(exprId = attrRef.exprId) + case _ => output + } + projOutputList = projOutputList.:+(output) + case exp: Expression => projOutputList = projOutputList.:+(exp) + } + projOutputList + } + def apply(subsumer: ModularPlan, subsumee: ModularPlan, compensation: Option[ModularPlan], @@ -755,10 +784,41 @@ private object SelectSelectNoChildDelta extends MVMatchPattern with PredicateHel if (r2eJoinsMatch) { if (isPredicateEmR && isOutputEmR && isOutputRmE && rejoin.isEmpty && isLOEmLOR) { + // check if the projection output list is same as subsumer and if not get the output + // list based on subsumer. If already same, avoid updating the output list, to avoid + // rewriting the plan + val isAllProjectsAreSame = sel_1q.outputList.forall(expr => + sel_1a.outputList.exists { + case alias@Alias(child, name) => + expr match { + case Alias(exprChild, exprName) => + exprChild.semanticEquals(child) && name.equalsIgnoreCase(exprName) + case _: NamedExpression => + alias.semanticEquals(expr) + } + case exp: NamedExpression => + expr.semanticEquals(exp) + }) + val sel1qOutputList = if (!isAllProjectsAreSame) { + getSubsumeeOutputList(sel_1q.outputList, sel_1a.outputList) + } else { + Seq.empty + } if (sel_1q.flagSpec.isEmpty) { - Seq(sel_1a) + if (sel1qOutputList.isEmpty) { + Seq(sel_1a) + } else { + Seq(sel_1a.copy(outputList = sel1qOutputList)) + } } else { - Seq(sel_1a.copy(flags = sel_1q.flags, flagSpec = sel_1q.flagSpec)) + if (sel1qOutputList.isEmpty) { + Seq(sel_1a.copy(flags = sel_1q.flags, + flagSpec = sel_1q.flagSpec)) + } else { + Seq(sel_1a.copy(flags = sel_1q.flags, + flagSpec = sel_1q.flagSpec, + outputList = sel1qOutputList)) + } } } else { // no compensation needed @@ -808,7 +868,9 @@ private object SelectSelectNoChildDelta extends MVMatchPattern with PredicateHel } else { Seq.empty }) + val sel1qOutputList = getSubsumeeOutputList(sel_1q.outputList, usel_1a.outputList) val sel_1q_temp = sel_1q.copy( + outputList = sel1qOutputList, predicateList = tPredicateList, children = tChildren, joinEdges = tJoinEdges.filter(_ != null), diff --git a/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/MVCreateTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/MVCreateTestCase.scala index a4aa1c5e54b..91b0750d235 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/MVCreateTestCase.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/MVCreateTestCase.scala @@ -290,6 +290,31 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll { sql(s"drop materialized view mv1") } + test("test create materialized view with simple and same projection with alias name") { + sql("drop materialized view if exists mv_alias") + // case 1: alias in mv create query and actual query with no alias + sql("create materialized view mv_alias as select empname as e1, designation from fact_table1") + var df1 = sql("select empname,designation from fact_table1") + assert(df1.queryExecution.sparkPlan.output.toList.head.toString().contains("empname")) + assert(TestUtil.verifyMVHit(df1.queryExecution.optimizedPlan, "mv_alias")) + checkAnswer(df1, sql("select empname,designation from fact_table2")) + + val df2 = sql("select empname as e2,designation from fact_table1") + assert(df2.queryExecution.sparkPlan.output.toList.head.toString().contains("e2")) + assert(TestUtil.verifyMVHit(df2.queryExecution.optimizedPlan, "mv_alias")) + checkAnswer(df2, sql("select empname,designation from fact_table2")) + + sql(s"drop materialized view mv_alias") + + // case 1: alias in actual query and mv query with no alias + sql("create materialized view mv_alias as select empname, designation from fact_table1") + df1 = sql("select empname as e1 ,designation from fact_table1") + assert(df1.queryExecution.sparkPlan.output.toList.head.toString().contains("e1")) + assert(TestUtil.verifyMVHit(df1.queryExecution.optimizedPlan, "mv_alias")) + checkAnswer(df1, sql("select empname,designation from fact_table2")) + sql(s"drop materialized view mv_alias") + } + test("test create materialized view with simple and sub projection") { sql("drop materialized view if exists mv2") sql("create materialized view mv2 as select empname, designation from fact_table1")