From 597933ca11b05fe52513f092f8eb63d5ae921ee1 Mon Sep 17 00:00:00 2001 From: "wangguangxin.cn" Date: Sat, 13 Apr 2024 08:34:37 +0800 Subject: [PATCH 1/3] support explode outer --- .../execution/CHGenerateExecTransformer.scala | 1 + .../execution/GenerateExecTransformer.scala | 31 ++++++------ .../gluten/execution/MiscOperatorSuite.scala | 50 +++++++++++++++++++ cpp/velox/substrait/SubstraitToVeloxPlan.cc | 8 ++- .../GenerateExecTransformerBase.scala | 10 ++-- 5 files changed, 78 insertions(+), 22 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHGenerateExecTransformer.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHGenerateExecTransformer.scala index fc7da0a6ddee..a97d487bf361 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHGenerateExecTransformer.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHGenerateExecTransformer.scala @@ -70,6 +70,7 @@ case class CHGenerateExecTransformer( context: SubstraitContext, inputRel: RelNode, generatorNode: ExpressionNode, + outer: Boolean, validation: Boolean): RelNode = { if (!validation) { RelBuilder.makeGenerateRel( diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala index a81d812d9cde..73802a2599ee 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala @@ -68,10 +68,8 @@ case class GenerateExecTransformer( override protected def withNewChildInternal(newChild: SparkPlan): GenerateExecTransformer = copy(generator, requiredChildOutput, outer, generatorOutput, newChild) - override protected def doGeneratorValidate( - generator: Generator, - outer: Boolean): ValidationResult = { - if (!supportsGenerate(generator, outer)) { + override protected def doGeneratorValidate(generator: Generator): ValidationResult = { + if (!supportsGenerate(generator)) { ValidationResult.failed( s"Velox backend does not support this generator: ${generator.getClass.getSimpleName}" + s", outer: $outer") @@ -84,6 +82,7 @@ case class GenerateExecTransformer( context: SubstraitContext, inputRel: RelNode, generatorNode: ExpressionNode, + outer: Boolean, validation: Boolean): RelNode = { val operatorId = context.nextOperatorId(this.nodeName) RelBuilder.makeGenerateRel( @@ -121,9 +120,12 @@ case class GenerateExecTransformer( } else { "0" } + val isOuter = if (outer) "1" else "0" parametersStr .append("isPosExplode=") .append(isPosExplode) + .append("isOuter=") + .append(isOuter) .append("\n") // isStack: 1 for Stack, 0 for others. @@ -162,17 +164,12 @@ case class GenerateExecTransformer( } object GenerateExecTransformer { - def supportsGenerate(generator: Generator, outer: Boolean): Boolean = { - // TODO: supports outer and remove this param. - if (outer) { - false - } else { - generator match { - case _: Inline | _: ExplodeBase | _: JsonTuple | _: Stack => - true - case _ => - false - } + def supportsGenerate(generator: Generator): Boolean = { + generator match { + case _: Inline | _: ExplodeBase | _: JsonTuple | _: Stack => + true + case _ => + false } } } @@ -180,7 +177,7 @@ object GenerateExecTransformer { object PullOutGenerateProjectHelper extends PullOutProjectHelper { val JSON_PATH_PREFIX = "$." def pullOutPreProject(generate: GenerateExec): SparkPlan = { - if (GenerateExecTransformer.supportsGenerate(generate.generator, generate.outer)) { + if (GenerateExecTransformer.supportsGenerate(generate.generator)) { generate.generator match { case _: Inline | _: ExplodeBase => val expressionMap = new mutable.HashMap[Expression, NamedExpression]() @@ -278,7 +275,7 @@ object PullOutGenerateProjectHelper extends PullOutProjectHelper { } def pullOutPostProject(generate: GenerateExec): SparkPlan = { - if (GenerateExecTransformer.supportsGenerate(generate.generator, generate.outer)) { + if (GenerateExecTransformer.supportsGenerate(generate.generator)) { generate.generator match { case PosExplode(_) => val originalOrdinal = generate.generatorOutput.head diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala index 8063a5d12207..358ae3d8fa5a 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala @@ -1976,4 +1976,54 @@ class MiscOperatorSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa } } } + + test("test explode_outer && pos_explode_outer on array") { + Seq[(Int, Seq[Int])]( + (1, Seq(1, 2, 3)), + (2, Seq()), + (3, Seq(3, null.asInstanceOf[Int])), + (4, null.asInstanceOf[Array[Int]]) + ).toDF("a", "intList").createTempView("t") + + runQueryAndCompare(""" + |SELECT + | a, explode_outer(intList) + | FROM t + |""".stripMargin) { + checkGlutenOperatorMatch[GenerateExecTransformer] + } + + runQueryAndCompare(""" + |SELECT + | a, posexplode_outer(intList) + | FROM t + |""".stripMargin) { + checkGlutenOperatorMatch[GenerateExecTransformer] + } + } + + test("test explode_outer && pos_explode_outer on map") { + Seq[(Int, Map[String, String])]( + (1, Map("a" -> "1", "b" -> "2")), + (2, Map()), + (3, Map("c" -> "3")), + (4, null.asInstanceOf[Map[String, String]]) + ).toDF("a", "mapData").createTempView("t") + + runQueryAndCompare(""" + |SELECT + | a, explode_outer(mapData) + | FROM t + |""".stripMargin) { + checkGlutenOperatorMatch[GenerateExecTransformer] + } + + runQueryAndCompare(""" + |SELECT + | a, posexplode_outer(mapData) + | FROM t + |""".stripMargin) { + checkGlutenOperatorMatch[GenerateExecTransformer] + } + } } diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index 5870c4ef9f35..ab532b858652 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -827,8 +827,14 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: ordinalityName = std::make_optional("pos"); } + bool isOuter = false; + if (generateRel.has_advanced_extension() && + SubstraitParser::configSetInOptimization(generateRel.advanced_extension(), "isOuter=")) { + isOuter = true; + } + return std::make_shared( - nextPlanNodeId(), replicated, unnest, std::move(unnestNames), ordinalityName, childNode); + nextPlanNodeId(), replicated, unnest, std::move(unnestNames), ordinalityName, childNode, isOuter); } const core::WindowNode::Frame SubstraitToVeloxPlanConverter::createWindowFrame( diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GenerateExecTransformerBase.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GenerateExecTransformerBase.scala index 698d1f14c5b9..d6cd304d6e88 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GenerateExecTransformerBase.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GenerateExecTransformerBase.scala @@ -40,12 +40,13 @@ abstract class GenerateExecTransformerBase( child: SparkPlan) extends UnaryTransformSupport { - protected def doGeneratorValidate(generator: Generator, outer: Boolean): ValidationResult + protected def doGeneratorValidate(generator: Generator): ValidationResult protected def getRelNode( context: SubstraitContext, inputRel: RelNode, generatorNode: ExpressionNode, + outer: Boolean, validation: Boolean): RelNode protected lazy val requiredChildOutputNodes: Seq[ExpressionNode] = { @@ -66,19 +67,20 @@ abstract class GenerateExecTransformerBase( override def producedAttributes: AttributeSet = AttributeSet(generatorOutput) override protected def doValidateInternal(): ValidationResult = { - val validationResult = doGeneratorValidate(generator, outer) + val validationResult = doGeneratorValidate(generator) if (!validationResult.ok()) { return validationResult } val context = new SubstraitContext val relNode = - getRelNode(context, null, getGeneratorNode(context), validation = true) + getRelNode(context, null, getGeneratorNode(context), outer, validation = true) doNativeValidation(context, relNode) } override protected def doTransform(context: SubstraitContext): TransformContext = { val childCtx = child.asInstanceOf[TransformSupport].transform(context) - val relNode = getRelNode(context, childCtx.root, getGeneratorNode(context), validation = false) + val relNode = + getRelNode(context, childCtx.root, getGeneratorNode(context), outer, validation = false) TransformContext(output, relNode) } From afaca3b3faf422fc124e9109d0f6b0523ad3533c Mon Sep 17 00:00:00 2001 From: "wangguangxin.cn" Date: Tue, 24 Dec 2024 23:19:30 +0800 Subject: [PATCH 2/3] test ci --- ep/build-velox/src/get_velox.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ep/build-velox/src/get_velox.sh b/ep/build-velox/src/get_velox.sh index 7e00939e9be6..c5ab35c84d2e 100755 --- a/ep/build-velox/src/get_velox.sh +++ b/ep/build-velox/src/get_velox.sh @@ -16,8 +16,8 @@ set -exu -VELOX_REPO=https://github.com/oap-project/velox.git -VELOX_BRANCH=2024_12_09 +VELOX_REPO=https://github.com/WangGuangxin/velox.git +VELOX_BRANCH=2024_12_09-explode VELOX_HOME="" OS=`uname -s` From 2c2cbad209bbf8f53c18fcd406c8b1cc753bc949 Mon Sep 17 00:00:00 2001 From: "wangguangxin.cn" Date: Wed, 25 Dec 2024 07:55:01 +0800 Subject: [PATCH 3/3] fix clickhouse ci --- .../apache/gluten/execution/CHGenerateExecTransformer.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHGenerateExecTransformer.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHGenerateExecTransformer.scala index a97d487bf361..b064768674a7 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHGenerateExecTransformer.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHGenerateExecTransformer.scala @@ -61,9 +61,7 @@ case class CHGenerateExecTransformer( override protected def withNewChildInternal(newChild: SparkPlan): CHGenerateExecTransformer = copy(generator, requiredChildOutput, outer, generatorOutput, newChild) - override protected def doGeneratorValidate( - generator: Generator, - outer: Boolean): ValidationResult = + override protected def doGeneratorValidate(generator: Generator): ValidationResult = ValidationResult.succeeded override protected def getRelNode(