Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-8332][VL] Support explode/posexplode outer #8333

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,14 @@ case class CHGenerateExecTransformer(
override protected def withNewChildInternal(newChild: SparkPlan): CHGenerateExecTransformer =
copy(generator, requiredChildOutput, outer, generatorOutput, newChild)

override protected def doGeneratorValidate(
generator: Generator,
outer: Boolean): ValidationResult =
override protected def doGeneratorValidate(generator: Generator): ValidationResult =
ValidationResult.succeeded

override protected def getRelNode(
context: SubstraitContext,
inputRel: RelNode,
generatorNode: ExpressionNode,
outer: Boolean,
validation: Boolean): RelNode = {
if (!validation) {
RelBuilder.makeGenerateRel(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,8 @@ case class GenerateExecTransformer(
override protected def withNewChildInternal(newChild: SparkPlan): GenerateExecTransformer =
copy(generator, requiredChildOutput, outer, generatorOutput, newChild)

override protected def doGeneratorValidate(
generator: Generator,
outer: Boolean): ValidationResult = {
if (!supportsGenerate(generator, outer)) {
override protected def doGeneratorValidate(generator: Generator): ValidationResult = {
if (!supportsGenerate(generator)) {
ValidationResult.failed(
s"Velox backend does not support this generator: ${generator.getClass.getSimpleName}" +
s", outer: $outer")
Expand All @@ -84,6 +82,7 @@ case class GenerateExecTransformer(
context: SubstraitContext,
inputRel: RelNode,
generatorNode: ExpressionNode,
outer: Boolean,
validation: Boolean): RelNode = {
val operatorId = context.nextOperatorId(this.nodeName)
RelBuilder.makeGenerateRel(
Expand Down Expand Up @@ -121,9 +120,12 @@ case class GenerateExecTransformer(
} else {
"0"
}
val isOuter = if (outer) "1" else "0"
parametersStr
.append("isPosExplode=")
.append(isPosExplode)
.append("isOuter=")
.append(isOuter)
.append("\n")

// isStack: 1 for Stack, 0 for others.
Expand Down Expand Up @@ -162,25 +164,20 @@ case class GenerateExecTransformer(
}

object GenerateExecTransformer {
def supportsGenerate(generator: Generator, outer: Boolean): Boolean = {
// TODO: supports outer and remove this param.
if (outer) {
false
} else {
generator match {
case _: Inline | _: ExplodeBase | _: JsonTuple | _: Stack =>
true
case _ =>
false
}
def supportsGenerate(generator: Generator): Boolean = {
generator match {
case _: Inline | _: ExplodeBase | _: JsonTuple | _: Stack =>
true
case _ =>
false
}
}
}

object PullOutGenerateProjectHelper extends PullOutProjectHelper {
val JSON_PATH_PREFIX = "$."
def pullOutPreProject(generate: GenerateExec): SparkPlan = {
if (GenerateExecTransformer.supportsGenerate(generate.generator, generate.outer)) {
if (GenerateExecTransformer.supportsGenerate(generate.generator)) {
generate.generator match {
case _: Inline | _: ExplodeBase =>
val expressionMap = new mutable.HashMap[Expression, NamedExpression]()
Expand Down Expand Up @@ -278,7 +275,7 @@ object PullOutGenerateProjectHelper extends PullOutProjectHelper {
}

def pullOutPostProject(generate: GenerateExec): SparkPlan = {
if (GenerateExecTransformer.supportsGenerate(generate.generator, generate.outer)) {
if (GenerateExecTransformer.supportsGenerate(generate.generator)) {
generate.generator match {
case PosExplode(_) =>
val originalOrdinal = generate.generatorOutput.head
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1976,4 +1976,54 @@ class MiscOperatorSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa
}
}
}

test("test explode_outer && pos_explode_outer on array") {
Seq[(Int, Seq[Int])](
(1, Seq(1, 2, 3)),
(2, Seq()),
(3, Seq(3, null.asInstanceOf[Int])),
(4, null.asInstanceOf[Array[Int]])
).toDF("a", "intList").createTempView("t")

runQueryAndCompare("""
|SELECT
| a, explode_outer(intList)
| FROM t
|""".stripMargin) {
checkGlutenOperatorMatch[GenerateExecTransformer]
}

runQueryAndCompare("""
|SELECT
| a, posexplode_outer(intList)
| FROM t
|""".stripMargin) {
checkGlutenOperatorMatch[GenerateExecTransformer]
}
}

test("test explode_outer && pos_explode_outer on map") {
Seq[(Int, Map[String, String])](
(1, Map("a" -> "1", "b" -> "2")),
(2, Map()),
(3, Map("c" -> "3")),
(4, null.asInstanceOf[Map[String, String]])
).toDF("a", "mapData").createTempView("t")

runQueryAndCompare("""
|SELECT
| a, explode_outer(mapData)
| FROM t
|""".stripMargin) {
checkGlutenOperatorMatch[GenerateExecTransformer]
}

runQueryAndCompare("""
|SELECT
| a, posexplode_outer(mapData)
| FROM t
|""".stripMargin) {
checkGlutenOperatorMatch[GenerateExecTransformer]
}
}
}
8 changes: 7 additions & 1 deletion cpp/velox/substrait/SubstraitToVeloxPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -827,8 +827,14 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
ordinalityName = std::make_optional<std::string>("pos");
}

bool isOuter = false;
if (generateRel.has_advanced_extension() &&
SubstraitParser::configSetInOptimization(generateRel.advanced_extension(), "isOuter=")) {
isOuter = true;
}

return std::make_shared<core::UnnestNode>(
nextPlanNodeId(), replicated, unnest, std::move(unnestNames), ordinalityName, childNode);
nextPlanNodeId(), replicated, unnest, std::move(unnestNames), ordinalityName, childNode, isOuter);
}

const core::WindowNode::Frame SubstraitToVeloxPlanConverter::createWindowFrame(
Expand Down
4 changes: 2 additions & 2 deletions ep/build-velox/src/get_velox.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

set -exu

VELOX_REPO=https://github.com/oap-project/velox.git
VELOX_BRANCH=2024_12_09
VELOX_REPO=https://github.com/WangGuangxin/velox.git
VELOX_BRANCH=2024_12_09-explode
VELOX_HOME=""

OS=`uname -s`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,13 @@ abstract class GenerateExecTransformerBase(
child: SparkPlan)
extends UnaryTransformSupport {

protected def doGeneratorValidate(generator: Generator, outer: Boolean): ValidationResult
protected def doGeneratorValidate(generator: Generator): ValidationResult

protected def getRelNode(
context: SubstraitContext,
inputRel: RelNode,
generatorNode: ExpressionNode,
outer: Boolean,
validation: Boolean): RelNode

protected lazy val requiredChildOutputNodes: Seq[ExpressionNode] = {
Expand All @@ -66,19 +67,20 @@ abstract class GenerateExecTransformerBase(
override def producedAttributes: AttributeSet = AttributeSet(generatorOutput)

override protected def doValidateInternal(): ValidationResult = {
val validationResult = doGeneratorValidate(generator, outer)
val validationResult = doGeneratorValidate(generator)
if (!validationResult.ok()) {
return validationResult
}
val context = new SubstraitContext
val relNode =
getRelNode(context, null, getGeneratorNode(context), validation = true)
getRelNode(context, null, getGeneratorNode(context), outer, validation = true)
doNativeValidation(context, relNode)
}

override protected def doTransform(context: SubstraitContext): TransformContext = {
val childCtx = child.asInstanceOf[TransformSupport].transform(context)
val relNode = getRelNode(context, childCtx.root, getGeneratorNode(context), validation = false)
val relNode =
getRelNode(context, childCtx.root, getGeneratorNode(context), outer, validation = false)
TransformContext(output, relNode)
}

Expand Down
Loading