Skip to content

Commit

Permalink
[SPARK-20534][SQL] Make outer generate exec return empty rows
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
Generate exec does not produce `null` values if the generator for the input row is empty and the generate operates in outer mode without join. This is caused by the fact that the `join=false` code path is different from the `join=true` code path, and that the `join=false` code path did deal with outer properly. This PR addresses this issue.

## How was this patch tested?
Updated `outer*` tests in `GeneratorFunctionSuite`.

Author: Herman van Hovell <[email protected]>

Closes #17810 from hvanhovell/SPARK-20534.
  • Loading branch information
hvanhovell authored and gatorsmile committed May 1, 2017
1 parent f0169a1 commit 6b44c4d
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -441,8 +441,7 @@ object ColumnPruning extends Rule[LogicalPlan] {
g.copy(child = prunedChild(g.child, g.references))

// Turn off `join` for Generate if no column from it's child is used
case p @ Project(_, g: Generate)
if g.join && !g.outer && p.references.subsetOf(g.generatedSet) =>
case p @ Project(_, g: Generate) if g.join && p.references.subsetOf(g.generatedSet) =>
p.copy(child = g.copy(join = false))

// Eliminate unneeded attributes from right side of a Left Existence Join.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extend
* @param join when true, each output row is implicitly joined with the input tuple that produced
* it.
* @param outer when true, each input row will be output at least once, even if the output of the
* given `generator` is empty. `outer` has no effect when `join` is false.
* given `generator` is empty.
* @param qualifier Qualifier for the attributes of generator(UDTF)
* @param generatorOutput The output schema of the Generator.
* @param child Children logical plan node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
private[execution] sealed case class LazyIterator(func: () => TraversableOnce[InternalRow])
extends Iterator[InternalRow] {

lazy val results = func().toIterator
lazy val results: Iterator[InternalRow] = func().toIterator
override def hasNext: Boolean = results.hasNext
override def next(): InternalRow = results.next()
}
Expand All @@ -50,7 +50,7 @@ private[execution] sealed case class LazyIterator(func: () => TraversableOnce[In
* @param join when true, each output row is implicitly joined with the input tuple that produced
* it.
* @param outer when true, each input row will be output at least once, even if the output of the
* given `generator` is empty. `outer` has no effect when `join` is false.
* given `generator` is empty.
* @param generatorOutput the qualified output attributes of the generator of this node, which
* constructed in analysis phase, and we can not change it, as the
* parent node bound with it already.
Expand Down Expand Up @@ -78,15 +78,15 @@ case class GenerateExec(

override def outputPartitioning: Partitioning = child.outputPartitioning

val boundGenerator = BindReferences.bindReference(generator, child.output)
val boundGenerator: Generator = BindReferences.bindReference(generator, child.output)

protected override def doExecute(): RDD[InternalRow] = {
// boundGenerator.terminate() should be triggered after all of the rows in the partition
val rows = if (join) {
child.execute().mapPartitionsInternal { iter =>
val generatorNullRow = new GenericInternalRow(generator.elementSchema.length)
val numOutputRows = longMetric("numOutputRows")
child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
val generatorNullRow = new GenericInternalRow(generator.elementSchema.length)
val rows = if (join) {
val joinedRow = new JoinedRow

iter.flatMap { row =>
// we should always set the left (child output)
joinedRow.withLeft(row)
Expand All @@ -101,18 +101,21 @@ case class GenerateExec(
// keep it the same as Hive does
joinedRow.withRight(row)
}
} else {
iter.flatMap { row =>
val outputRows = boundGenerator.eval(row)
if (outer && outputRows.isEmpty) {
Seq(generatorNullRow)
} else {
outputRows
}
} ++ LazyIterator(boundGenerator.terminate)
}
} else {
child.execute().mapPartitionsInternal { iter =>
iter.flatMap(boundGenerator.eval) ++ LazyIterator(boundGenerator.terminate)
}
}

val numOutputRows = longMetric("numOutputRows")
rows.mapPartitionsWithIndexInternal { (index, iter) =>
// Convert the rows to unsafe rows.
val proj = UnsafeProjection.create(output, output)
proj.initialize(index)
iter.map { r =>
rows.map { r =>
numOutputRows += 1
proj(r)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext {
val df = Seq((1, Seq(1, 2, 3)), (2, Seq())).toDF("a", "intList")
checkAnswer(
df.select(explode_outer('intList)),
Row(1) :: Row(2) :: Row(3) :: Nil)
Row(1) :: Row(2) :: Row(3) :: Row(null) :: Nil)
}

test("single posexplode") {
Expand All @@ -105,7 +105,7 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext {
val df = Seq((1, Seq(1, 2, 3)), (2, Seq())).toDF("a", "intList")
checkAnswer(
df.select(posexplode_outer('intList)),
Row(0, 1) :: Row(1, 2) :: Row(2, 3) :: Nil)
Row(0, 1) :: Row(1, 2) :: Row(2, 3) :: Row(null, null) :: Nil)
}

test("explode and other columns") {
Expand Down Expand Up @@ -161,7 +161,7 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext {

checkAnswer(
df.select(explode_outer('intList).as('int)).select('int),
Row(1) :: Row(2) :: Row(3) :: Nil)
Row(1) :: Row(2) :: Row(3) :: Row(null) :: Nil)

checkAnswer(
df.select(explode('intList).as('int)).select(sum('int)),
Expand All @@ -182,7 +182,7 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext {

checkAnswer(
df.select(explode_outer('map)),
Row("a", "b") :: Row("c", "d") :: Nil)
Row("a", "b") :: Row(null, null) :: Row("c", "d") :: Nil)
}

test("explode on map with aliases") {
Expand All @@ -198,7 +198,7 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext {

checkAnswer(
df.select(explode_outer('map).as("key1" :: "value1" :: Nil)).select("key1", "value1"),
Row("a", "b") :: Nil)
Row("a", "b") :: Row(null, null) :: Nil)
}

test("self join explode") {
Expand Down Expand Up @@ -279,7 +279,7 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext {
)
checkAnswer(
df2.selectExpr("inline_outer(col1)"),
Row(3, "4") :: Row(5, "6") :: Nil
Row(null, null) :: Row(3, "4") :: Row(5, "6") :: Nil
)
}

Expand Down

0 comments on commit 6b44c4d

Please sign in to comment.