Skip to content

Commit

Permalink
[SPARK-26680][SPARK-25767][SQL][BACKPORT-2.3] Eagerly create inputVar…
Browse files Browse the repository at this point in the history
…s while conditions are appropriate

## What changes were proposed in this pull request?

Back port of #22789 and #23617 to branch-2.3

When a user passes a Stream to groupBy, ```CodegenSupport.consume``` ends up lazily generating ```inputVars``` from a Stream, since the field ```output``` will be a Stream. At the time ```output.zipWithIndex.map``` is called, conditions are correct. However, by the time the map operation actually executes, conditions are no longer appropriate. The closure used by the map operation ends up using a reference to the partially created ```inputVars```. As a result, a StackOverflowError occurs.

This PR ensures that ```inputVars``` is eagerly created while conditions are appropriate. It seems this was also an issue with the code path for creating ```inputVars``` from ```outputVars``` (SPARK-25767). I simply extended the solution for that code path to encompass both code paths.

## How was this patch tested?

SQL unit tests
new test
python tests

Closes #23642 from bersprockets/SPARK-26680_branch23.

Authored-by: Bruce Robbins <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
bersprockets authored and dongjoon-hyun committed Jan 25, 2019
1 parent ded902c commit 373a627
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ trait CodegenSupport extends SparkPlan {
* Note that `outputVars` and `row` can't both be null.
*/
final def consume(ctx: CodegenContext, outputVars: Seq[ExprCode], row: String = null): String = {
val inputVars =
val inputVarsCandidate =
if (outputVars != null) {
assert(outputVars.length == output.length)
// outputVars will be used to generate the code for UnsafeRow, so we should copy them
Expand All @@ -154,6 +154,11 @@ trait CodegenSupport extends SparkPlan {
}
}

val inputVars = inputVarsCandidate match {
case stream: Stream[ExprCode] => stream.force
case other => other
}

val rowVar = prepareRowVar(ctx, row, outputVars)

// Set up the `currentVars` in the codegen context, as we generate the code of `inputVars`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,4 +319,24 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext {
assert(df.limit(1).collect() === Array(Row("bat", 8.0)))
}
}

test("SPARK-25767: Lazy evaluated stream of expressions handled correctly") {
val a = Seq(1).toDF("key")
val b = Seq((1, "a")).toDF("key", "value")
val c = Seq(1).toDF("key")

val ab = a.join(b, Stream("key"), "left")
val abc = ab.join(c, Seq("key"), "left")

checkAnswer(abc, Row(1, "a"))
}

test("SPARK-26680: Stream in groupBy does not cause StackOverflowError") {
val groupByCols = Stream(col("key"))
val df = Seq((1, 2), (2, 3), (1, 3)).toDF("key", "value")
.groupBy(groupByCols: _*)
.max("value")

checkAnswer(df, Seq(Row(1, 3), Row(2, 3)))
}
}

0 comments on commit 373a627

Please sign in to comment.