From d5e9be7194fd15adac53c837abfcc10b947797b9 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Thu, 24 Jan 2019 11:18:08 +0100 Subject: [PATCH] [SPARK-26680][SQL] Eagerly create inputVars while conditions are appropriate ## What changes were proposed in this pull request? When a user passes a Stream to groupBy, ```CodegenSupport.consume``` ends up lazily generating ```inputVars``` from a Stream, since the field ```output``` will be a Stream. At the time ```output.zipWithIndex.map``` is called, conditions are correct. However, by the time the map operation actually executes, conditions are no longer appropriate. The closure used by the map operation ends up using a reference to the partially created ```inputVars```. As a result, a StackOverflowError occurs. This PR ensures that ```inputVars``` is eagerly created while conditions are appropriate. It seems this was also an issue with the code path for creating ```inputVars``` from ```outputVars``` (SPARK-25767). I simply extended the solution for that code path to encompass both code paths. ## How was this patch tested? SQL unit tests new test python tests Closes #23617 from bersprockets/SPARK-26680_opt1. Authored-by: Bruce Robbins Signed-off-by: Herman van Hovell --- .../spark/sql/execution/WholeStageCodegenExec.scala | 12 +++++++----- .../spark/sql/execution/WholeStageCodegenSuite.scala | 9 +++++++++ 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 3b0a99669ccd0..d3a93f5eb3951 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -143,14 +143,11 @@ trait CodegenSupport extends SparkPlan { * Note that `outputVars` and `row` can't both be null. */ final def consume(ctx: CodegenContext, outputVars: Seq[ExprCode], row: String = null): String = { - val inputVars = + val inputVarsCandidate = if (outputVars != null) { assert(outputVars.length == output.length) // outputVars will be used to generate the code for UnsafeRow, so we should copy them - outputVars.map(_.copy()) match { - case stream: Stream[ExprCode] => stream.force - case other => other - } + outputVars.map(_.copy()) } else { assert(row != null, "outputVars and row cannot both be null.") ctx.currentVars = null @@ -160,6 +157,11 @@ trait CodegenSupport extends SparkPlan { } } + val inputVars = inputVarsCandidate match { + case stream: Stream[ExprCode] => stream.force + case other => other + } + val rowVar = prepareRowVar(ctx, row, outputVars) // Set up the `currentVars` in the codegen context, as we generate the code of `inputVars` diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index 09ad0fdd66369..e03f084171623 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -330,4 +330,13 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext { checkAnswer(abc, Row(1, "a")) } + + test("SPARK-26680: Stream in groupBy does not cause StackOverflowError") { + val groupByCols = Stream(col("key")) + val df = Seq((1, 2), (2, 3), (1, 3)).toDF("key", "value") + .groupBy(groupByCols: _*) + .max("value") + + checkAnswer(df, Seq(Row(1, 3), Row(2, 3))) + } }