From ca9eff68424511fa11cc2bd695f1fddaae178e3c Mon Sep 17 00:00:00 2001 From: 10129659 Date: Wed, 2 Aug 2017 11:48:21 +0800 Subject: [PATCH 01/20] The wholestage codegen will be slower when the function is too long --- .../expressions/codegen/CodeGenerator.scala | 10 ++++ .../apache/spark/sql/internal/SQLConf.scala | 9 +++ .../sql/execution/WholeStageCodegenExec.scala | 6 ++ .../benchmark/AggregateBenchmark.scala | 55 +++++++++++++++++++ 4 files changed, 80 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index a014e2aa34820..57ea62a4777f1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -355,6 +355,16 @@ class CodegenContext { */ private val placeHolderToComments = new mutable.HashMap[String, String] + /** + * Returns the length of codegen function is too long or not + */ + def existTooLongFunction(): Boolean = { + classFunctions.exists { case (className,functions) => + functions.exists{ case (name, code) => + CodeFormatter.stripExtraNewLines(code).count(_ == '\n') > SQLConf.get.maxFunctionLength + } + } + } /** * Returns a term name that is unique within this instance of a `CodegenContext`. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index a819cddcae988..f05b8f810aaab 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -572,6 +572,13 @@ object SQLConf { "disable logging or -1 to apply no limit.") .createWithDefault(1000) + val WHOLESTAGE_MAX_FUNCTION_LEN = buildConf("spark.sql.codegen.MaxFunctionLength") + .internal() + .doc("The maximum number of function length that will be supported before" + + " deactivating whole-stage codegen.") + .intConf + .createWithDefault(1500) + val FILES_MAX_PARTITION_BYTES = buildConf("spark.sql.files.maxPartitionBytes") .doc("The maximum number of bytes to pack into a single partition when reading files.") .longConf @@ -1014,6 +1021,8 @@ class SQLConf extends Serializable with Logging { def loggingMaxLinesForCodegen: Int = getConf(CODEGEN_LOGGING_MAX_LINES) + def maxFunctionLength: Int = getConf(WHOLESTAGE_MAX_FUNCTION_LEN) + def tableRelationCacheSize: Int = getConf(StaticSQLConf.FILESOURCE_TABLE_RELATION_CACHE_SIZE) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 34134db278ad8..247e995d43617 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -370,6 +370,12 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co override def doExecute(): RDD[InternalRow] = { val (ctx, cleanedSource) = doCodeGen() + val existLongFunction = ctx.existTooLongFunction + if (existLongFunction){ + logWarning(s"Function is too long, Whole-stage codegen disabled for this plan:\n " + + s"$treeString") + return child.execute() + } // try to compile and fallback if it failed try { CodeGenerator.compile(cleanedSource) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala index 8a798fb444696..995b5211d771c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala @@ -301,6 +301,61 @@ class AggregateBenchmark extends BenchmarkBase { */ } + ignore("max function length of wholestagecodegen") { + val N = 20 << 15 + + val benchmark = new Benchmark("max function length of wholestagecodegen", N) + def f(): Unit = sparkSession.range(N) + .selectExpr( + "id", + "(id & 1023) as k1", + "cast(id & 1023 as double) as k2", + "cast(id & 1023 as int) as k3", + "case when id > 100 and id <= 200 then 1 else 0 end as v1", + "case when id > 200 and id <= 300 then 1 else 0 end as v2", + "case when id > 300 and id <= 400 then 1 else 0 end as v3", + "case when id > 400 and id <= 500 then 1 else 0 end as v4", + "case when id > 500 and id <= 600 then 1 else 0 end as v5", + "case when id > 600 and id <= 700 then 1 else 0 end as v6", + "case when id > 700 and id <= 800 then 1 else 0 end as v7", + "case when id > 800 and id <= 900 then 1 else 0 end as v8", + "case when id > 900 and id <= 1000 then 1 else 0 end as v9", + "case when id > 1000 and id <= 1100 then 1 else 0 end as v10", + "case when id > 1100 and id <= 1200 then 1 else 0 end as v11", + "case when id > 1200 and id <= 1300 then 1 else 0 end as v12", + "case when id > 1300 and id <= 1400 then 1 else 0 end as v13", + "case when id > 1400 and id <= 1500 then 1 else 0 end as v14", + "case when id > 1500 and id <= 1600 then 1 else 0 end as v15", + "case when id > 1600 and id <= 1700 then 1 else 0 end as v16", + "case when id > 1700 and id <= 1800 then 1 else 0 end as v17", + "case when id > 1800 and id <= 1900 then 1 else 0 end as v18") + .groupBy("k1", "k2", "k3") + .sum() + .collect() + + benchmark.addCase(s"codegen = F") { iter => + sparkSession.conf.set("spark.sql.codegen.wholeStage", "false") + f() + } + + benchmark.addCase(s"codegen = T") { iter => + sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") + sparkSession.conf.set("spark.sql.codegen.MaxFunctionLength", "10000") + f() + } + + benchmark.run() + + /* + Java HotSpot(TM) 64-Bit Server VM 1.8.0_111-b14 on Windows 7 6.1 + Intel64 Family 6 Model 58 Stepping 9, GenuineIntel + max function length of wholestagecodegen: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + codegen = F 443 / 507 1.5 676.0 1.0X + codegen = T 3279 / 3283 0.2 5002.6 0.1X + */ + } + ignore("cube") { val N = 5 << 20 From 1b0ac5ed896136df3579a61d7ef93980c0647e97 Mon Sep 17 00:00:00 2001 From: 10129659 Date: Wed, 2 Aug 2017 12:41:24 +0800 Subject: [PATCH 02/20] The wholestage codegen will be slower when the function is too long --- .../spark/sql/catalyst/expressions/codegen/CodeGenerator.scala | 2 +- .../org/apache/spark/sql/execution/WholeStageCodegenExec.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 57ea62a4777f1..6c96a6712c75d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -359,7 +359,7 @@ class CodegenContext { * Returns the length of codegen function is too long or not */ def existTooLongFunction(): Boolean = { - classFunctions.exists { case (className,functions) => + classFunctions.exists { case (className, functions) => functions.exists{ case (name, code) => CodeFormatter.stripExtraNewLines(code).count(_ == '\n') > SQLConf.get.maxFunctionLength } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 247e995d43617..b18335006ef68 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -371,7 +371,7 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co override def doExecute(): RDD[InternalRow] = { val (ctx, cleanedSource) = doCodeGen() val existLongFunction = ctx.existTooLongFunction - if (existLongFunction){ + if (existLongFunction) { logWarning(s"Function is too long, Whole-stage codegen disabled for this plan:\n " + s"$treeString") return child.execute() From 5582f008ee1ef706242b88b898f9f5fd7a9947ab Mon Sep 17 00:00:00 2001 From: 10129659 Date: Mon, 7 Aug 2017 14:49:35 +0800 Subject: [PATCH 03/20] Modify annotation of existTooLongFunction --- .../sql/execution/benchmark/AggregateBenchmark.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala index 995b5211d771c..077c165a9a247 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala @@ -340,7 +340,7 @@ class AggregateBenchmark extends BenchmarkBase { benchmark.addCase(s"codegen = T") { iter => sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") - sparkSession.conf.set("spark.sql.codegen.MaxFunctionLength", "10000") + sparkSession.conf.set("spark.sql.codegen.MaxFunctionLength", "1000") f() } @@ -349,10 +349,10 @@ class AggregateBenchmark extends BenchmarkBase { /* Java HotSpot(TM) 64-Bit Server VM 1.8.0_111-b14 on Windows 7 6.1 Intel64 Family 6 Model 58 Stepping 9, GenuineIntel - max function length of wholestagecodegen: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - codegen = F 443 / 507 1.5 676.0 1.0X - codegen = T 3279 / 3283 0.2 5002.6 0.1X + max function length of wholestagecodegen: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ---------------------------------------------------------------------------------------------- + codegen = F 443 / 507 1.5 676.0 1.0X + codegen = T 3279 / 3283 0.2 5002.6 0.1X */ } From 7c185c6da0ff5dc7261b6c39ddedd80667231c4c Mon Sep 17 00:00:00 2001 From: 10129659 Date: Mon, 7 Aug 2017 14:53:02 +0800 Subject: [PATCH 04/20] Modify annotation of existTooLongFunction --- .../spark/sql/execution/benchmark/AggregateBenchmark.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala index 077c165a9a247..debd547065cce 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala @@ -340,7 +340,7 @@ class AggregateBenchmark extends BenchmarkBase { benchmark.addCase(s"codegen = T") { iter => sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") - sparkSession.conf.set("spark.sql.codegen.MaxFunctionLength", "1000") + sparkSession.conf.set("spark.sql.codegen.MaxFunctionLength", "10000") f() } From 7e84753ca9befc8f3cea872250b2145e132ac837 Mon Sep 17 00:00:00 2001 From: 10129659 Date: Mon, 7 Aug 2017 14:55:46 +0800 Subject: [PATCH 05/20] Modify annotation of existTooLongFunction --- .../spark/sql/catalyst/expressions/codegen/CodeGenerator.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 6c96a6712c75d..3f56d2aacf14b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -357,6 +357,8 @@ class CodegenContext { /** * Returns the length of codegen function is too long or not + * It will count the lines of every codegen function, if there is a function of length + * greater than spark.sql.codegen.MaxFunctionLength, it will return true. */ def existTooLongFunction(): Boolean = { classFunctions.exists { case (className, functions) => From c4235dcdedd4ed55affbd4200680c774c116f97f Mon Sep 17 00:00:00 2001 From: 10129659 Date: Mon, 7 Aug 2017 15:15:12 +0800 Subject: [PATCH 06/20] Modify annotation of existTooLongFunction --- .../spark/sql/catalyst/expressions/codegen/CodeGenerator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 3f56d2aacf14b..e94553b819545 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -356,7 +356,7 @@ class CodegenContext { private val placeHolderToComments = new mutable.HashMap[String, String] /** - * Returns the length of codegen function is too long or not + * Returns if the length of codegen function is too long or not * It will count the lines of every codegen function, if there is a function of length * greater than spark.sql.codegen.MaxFunctionLength, it will return true. */ From 52da6b27bcabeba5ccb7d1b1e572be9abbfc8c46 Mon Sep 17 00:00:00 2001 From: 10129659 Date: Mon, 7 Aug 2017 15:56:10 +0800 Subject: [PATCH 07/20] Modify annotation of existTooLongFunction --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 4 ++-- .../apache/spark/sql/execution/WholeStageCodegenExec.scala | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f05b8f810aaab..8c88e69bb9b5b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -574,8 +574,8 @@ object SQLConf { val WHOLESTAGE_MAX_FUNCTION_LEN = buildConf("spark.sql.codegen.MaxFunctionLength") .internal() - .doc("The maximum number of function length that will be supported before" + - " deactivating whole-stage codegen.") + .doc("The maximum lines of a function that will be supported before" + + " deactivating whole-stage codegen.") .intConf .createWithDefault(1500) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index b18335006ef68..e8f93a2b76a8b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -372,7 +372,10 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co val (ctx, cleanedSource) = doCodeGen() val existLongFunction = ctx.existTooLongFunction if (existLongFunction) { - logWarning(s"Function is too long, Whole-stage codegen disabled for this plan:\n " + logWarning(s"Found too long generated codes and JIT optimization might not work, " + + s"Whole-stage codegen disabled for this plan, " + + s"You can change the config spark.sql.codegen.MaxFunctionLength " + + s"to adjust the function length limit:\n " + s"$treeString") return child.execute() } From d0c753a5d3f5fbb5e14da0eebbd5e9bd3778126c Mon Sep 17 00:00:00 2001 From: 10129659 Date: Thu, 10 Aug 2017 09:00:44 +0800 Subject: [PATCH 08/20] count lines of function without comments --- .../expressions/codegen/CodeFormatter.scala | 8 +++++ .../expressions/codegen/CodeGenerator.scala | 7 ++-- .../apache/spark/sql/internal/SQLConf.scala | 4 +-- .../codegen/CodeFormatterSuite.scala | 32 +++++++++++++++++++ .../benchmark/AggregateBenchmark.scala | 2 +- 5 files changed, 47 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala index 60e600d8dbd8f..dd2aa443a9136 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala @@ -89,6 +89,14 @@ object CodeFormatter { } new CodeAndComment(code.result().trim(), map) } + + def stripExtraNewLinesAndComments(input: String): String = { + val commentReg = + ("""([ |\t]*?\/\*[\s|\S]*?\*\/[ |\t]*?)|""" + // strip /*comment*/ + """([ |\t]*?\/\/[\s\S]*?\n)""").r // strip //comment + val codeWithoutComment = commentReg.replaceAllIn(input, "") + codeWithoutComment.replaceAll("""\n\s*\n""", "\n") // strip ExtraNewLines + } } private class CodeFormatter { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index e94553b819545..69272c865876f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -356,14 +356,15 @@ class CodegenContext { private val placeHolderToComments = new mutable.HashMap[String, String] /** - * Returns if the length of codegen function is too long or not + * Returns if there is a codegen function the lines of which is greater than maxLinesPerFunction * It will count the lines of every codegen function, if there is a function of length - * greater than spark.sql.codegen.MaxFunctionLength, it will return true. + * greater than spark.sql.codegen.maxLinesPerFunction, it will return true. */ def existTooLongFunction(): Boolean = { classFunctions.exists { case (className, functions) => functions.exists{ case (name, code) => - CodeFormatter.stripExtraNewLines(code).count(_ == '\n') > SQLConf.get.maxFunctionLength + val codeWithoutComments = CodeFormatter.stripExtraNewLinesAndComments(code) + codeWithoutComments.count(_ == '\n') > SQLConf.get.maxLinesPerFunction } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 8c88e69bb9b5b..96c2cea65c0d1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -572,7 +572,7 @@ object SQLConf { "disable logging or -1 to apply no limit.") .createWithDefault(1000) - val WHOLESTAGE_MAX_FUNCTION_LEN = buildConf("spark.sql.codegen.MaxFunctionLength") + val WHOLESTAGE_MAX_LINES_PER_FUNCTION = buildConf("spark.sql.codegen.maxLinesPerFunction") .internal() .doc("The maximum lines of a function that will be supported before" + " deactivating whole-stage codegen.") @@ -1021,7 +1021,7 @@ class SQLConf extends Serializable with Logging { def loggingMaxLinesForCodegen: Int = getConf(CODEGEN_LOGGING_MAX_LINES) - def maxFunctionLength: Int = getConf(WHOLESTAGE_MAX_FUNCTION_LEN) + def maxLinesPerFunction: Int = getConf(WHOLESTAGE_MAX_LINES_PER_FUNCTION) def tableRelationCacheSize: Int = getConf(StaticSQLConf.FILESOURCE_TABLE_RELATION_CACHE_SIZE) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala index 9d0a41661beaa..a0f1a64b0ab08 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala @@ -53,6 +53,38 @@ class CodeFormatterSuite extends SparkFunSuite { assert(reducedCode.body === "/*project_c4*/") } + test("removing extra new lines and comments") { + val code = + """ + |/* + | * multi + | * line + | * comments + | */ + | + |public function() { + |/*comment*/ + | /*comment_with_space*/ + |code_body + |//comment + |code_body + | //comment_with_space + | + |code_body + |} + """.stripMargin + + val reducedCode = CodeFormatter.stripExtraNewLinesAndComments(code) + assert(reducedCode === + """ + |public function() { + |code_body + |code_body + |code_body + |} + """.stripMargin) + } + testCase("basic example") { """ |class A { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala index debd547065cce..bf9da104a48d2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala @@ -340,7 +340,7 @@ class AggregateBenchmark extends BenchmarkBase { benchmark.addCase(s"codegen = T") { iter => sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") - sparkSession.conf.set("spark.sql.codegen.MaxFunctionLength", "10000") + sparkSession.conf.set("spark.sql.codegen.maxLinesPerFunction", "10000") f() } From d3238e9800f73b39b55e47419c5409b8111ea080 Mon Sep 17 00:00:00 2001 From: 10129659 Date: Thu, 10 Aug 2017 09:17:01 +0800 Subject: [PATCH 09/20] Add benchmark with default value --- .../execution/benchmark/AggregateBenchmark.scala | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala index bf9da104a48d2..f3870be50ad9d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala @@ -338,12 +338,18 @@ class AggregateBenchmark extends BenchmarkBase { f() } - benchmark.addCase(s"codegen = T") { iter => + benchmark.addCase(s"codegen = T maxLinesPerFunction = 10000") { iter => sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") sparkSession.conf.set("spark.sql.codegen.maxLinesPerFunction", "10000") f() } + benchmark.addCase(s"codegen = T maxLinesPerFunction = 1500") { iter => + sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") + sparkSession.conf.set("spark.sql.codegen.maxLinesPerFunction", "1500") + f() + } + benchmark.run() /* @@ -351,8 +357,9 @@ class AggregateBenchmark extends BenchmarkBase { Intel64 Family 6 Model 58 Stepping 9, GenuineIntel max function length of wholestagecodegen: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ---------------------------------------------------------------------------------------------- - codegen = F 443 / 507 1.5 676.0 1.0X - codegen = T 3279 / 3283 0.2 5002.6 0.1X + codegen = F 462 / 533 1.4 704.4 1.0X + codegen = T maxLinesPerFunction = 10000 3444 / 3447 0.2 5255.3 0.1X + codegen = T maxLinesPerFunction = 1500 447 / 478 1.5 682.1 1.0X */ } From d44a2f8499b4f7b9235fd138349005a4e3c960a5 Mon Sep 17 00:00:00 2001 From: 10129659 Date: Thu, 10 Aug 2017 09:36:07 +0800 Subject: [PATCH 10/20] Add benchmark with default value --- .../sql/execution/benchmark/AggregateBenchmark.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala index f3870be50ad9d..c121104568144 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala @@ -355,11 +355,11 @@ class AggregateBenchmark extends BenchmarkBase { /* Java HotSpot(TM) 64-Bit Server VM 1.8.0_111-b14 on Windows 7 6.1 Intel64 Family 6 Model 58 Stepping 9, GenuineIntel - max function length of wholestagecodegen: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + max function length of wholestagecodegen: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ---------------------------------------------------------------------------------------------- - codegen = F 462 / 533 1.4 704.4 1.0X - codegen = T maxLinesPerFunction = 10000 3444 / 3447 0.2 5255.3 0.1X - codegen = T maxLinesPerFunction = 1500 447 / 478 1.5 682.1 1.0X + codegen = F 462 / 533 1.4 704.4 1.0X + codegen = T maxLinesPerFunction = 10000 3444 / 3447 0.2 5255.3 0.1X + codegen = T maxLinesPerFunction = 1500 447 / 478 1.5 682.1 1.0X */ } From ce544a56dbeaa9fecb66706f3d2bad97280835bd Mon Sep 17 00:00:00 2001 From: 10129659 Date: Thu, 10 Aug 2017 13:11:52 +0800 Subject: [PATCH 11/20] Modified the doc --- .../expressions/codegen/CodeGenerator.scala | 13 +++++++------ .../org/apache/spark/sql/internal/SQLConf.scala | 5 +++-- .../spark/sql/execution/WholeStageCodegenExec.scala | 11 +++++------ .../execution/benchmark/AggregateBenchmark.scala | 8 ++++---- 4 files changed, 19 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 69272c865876f..807765c1e00a1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -356,18 +356,19 @@ class CodegenContext { private val placeHolderToComments = new mutable.HashMap[String, String] /** - * Returns if there is a codegen function the lines of which is greater than maxLinesPerFunction - * It will count the lines of every codegen function, if there is a function of length - * greater than spark.sql.codegen.maxLinesPerFunction, it will return true. + * It will count the lines of every Java function generated by whole-stage codegen, + * if there is a function of length greater than spark.sql.codegen.maxLinesPerFunction, + * it will return true. */ - def existTooLongFunction(): Boolean = { - classFunctions.exists { case (className, functions) => - functions.exists{ case (name, code) => + def isTooLongGeneratedFunction: Boolean = { + classFunctions.values.exists { _.values.exists { + code => val codeWithoutComments = CodeFormatter.stripExtraNewLinesAndComments(code) codeWithoutComments.count(_ == '\n') > SQLConf.get.maxLinesPerFunction } } } + /** * Returns a term name that is unique within this instance of a `CodegenContext`. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 96c2cea65c0d1..d2bf413ec3d16 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -574,8 +574,9 @@ object SQLConf { val WHOLESTAGE_MAX_LINES_PER_FUNCTION = buildConf("spark.sql.codegen.maxLinesPerFunction") .internal() - .doc("The maximum lines of a function that will be supported before" + - " deactivating whole-stage codegen.") + .doc("The maximum lines of a single Java function generated by whole-stage codegen. " + + "When the generated function exceeds this threshold, " + + "the whole-stage codegen is deactivated for this subtree of the current query plan.") .intConf .createWithDefault(1500) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index e8f93a2b76a8b..bacb7090a70ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -370,12 +370,11 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co override def doExecute(): RDD[InternalRow] = { val (ctx, cleanedSource) = doCodeGen() - val existLongFunction = ctx.existTooLongFunction - if (existLongFunction) { - logWarning(s"Found too long generated codes and JIT optimization might not work, " + - s"Whole-stage codegen disabled for this plan, " + - s"You can change the config spark.sql.codegen.MaxFunctionLength " + - s"to adjust the function length limit:\n " + if (ctx.isTooLongGeneratedFunction) { + logWarning("Found too long generated codes and JIT optimization might not work, " + + "Whole-stage codegen disabled for this plan, " + + "You can change the config spark.sql.codegen.MaxFunctionLength " + + "to adjust the function length limit:\n " + s"$treeString") return child.execute() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala index c121104568144..2c4b1c4c2ef11 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala @@ -355,11 +355,11 @@ class AggregateBenchmark extends BenchmarkBase { /* Java HotSpot(TM) 64-Bit Server VM 1.8.0_111-b14 on Windows 7 6.1 Intel64 Family 6 Model 58 Stepping 9, GenuineIntel - max function length of wholestagecodegen: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + max function length of wholestagecodegen: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ---------------------------------------------------------------------------------------------- - codegen = F 462 / 533 1.4 704.4 1.0X - codegen = T maxLinesPerFunction = 10000 3444 / 3447 0.2 5255.3 0.1X - codegen = T maxLinesPerFunction = 1500 447 / 478 1.5 682.1 1.0X + codegen = F 462 / 533 1.4 704.4 1.0X + codegen = T maxLinesPerFunction = 10000 3444 / 3447 0.2 5255.3 0.1X + codegen = T maxLinesPerFunction = 1500 447 / 478 1.5 682.1 1.0X */ } From 08f5ddf0442793a63beff7f9e3970fc8bb92a47d Mon Sep 17 00:00:00 2001 From: 10129659 Date: Thu, 10 Aug 2017 14:03:05 +0800 Subject: [PATCH 12/20] Align comments --- .../sql/catalyst/expressions/codegen/CodeFormatter.scala | 6 +++--- .../sql/execution/benchmark/AggregateBenchmark.scala | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala index dd2aa443a9136..7b398f424cead 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala @@ -92,10 +92,10 @@ object CodeFormatter { def stripExtraNewLinesAndComments(input: String): String = { val commentReg = - ("""([ |\t]*?\/\*[\s|\S]*?\*\/[ |\t]*?)|""" + // strip /*comment*/ - """([ |\t]*?\/\/[\s\S]*?\n)""").r // strip //comment + ("""([ |\t]*?\/\*[\s|\S]*?\*\/[ |\t]*?)|""" + // strip /*comment*/ + """([ |\t]*?\/\/[\s\S]*?\n)""").r // strip //comment val codeWithoutComment = commentReg.replaceAllIn(input, "") - codeWithoutComment.replaceAll("""\n\s*\n""", "\n") // strip ExtraNewLines + codeWithoutComment.replaceAll("""\n\s*\n""", "\n") // strip ExtraNewLines } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala index 2c4b1c4c2ef11..691fa9ac5e1e7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala @@ -355,11 +355,11 @@ class AggregateBenchmark extends BenchmarkBase { /* Java HotSpot(TM) 64-Bit Server VM 1.8.0_111-b14 on Windows 7 6.1 Intel64 Family 6 Model 58 Stepping 9, GenuineIntel - max function length of wholestagecodegen: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + max function length of wholestagecodegen: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ---------------------------------------------------------------------------------------------- - codegen = F 462 / 533 1.4 704.4 1.0X - codegen = T maxLinesPerFunction = 10000 3444 / 3447 0.2 5255.3 0.1X - codegen = T maxLinesPerFunction = 1500 447 / 478 1.5 682.1 1.0X + codegen = F 462 / 533 1.4 704.4 1.0X + codegen = T maxLinesPerFunction = 10000 3444 / 3447 0.2 5255.3 0.1X + codegen = T maxLinesPerFunction = 1500 447 / 478 1.5 682.1 1.0X */ } From 4fbe5f862def8d7edbf1fd161c709de1fa40138e Mon Sep 17 00:00:00 2001 From: 10129659 Date: Thu, 10 Aug 2017 16:07:39 +0800 Subject: [PATCH 13/20] Add test case for isTooLongGeneratedFunction --- .../execution/WholeStageCodegenSuite.scala | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index 183c68fd3c016..9857f1897cb55 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -149,4 +149,75 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { assert(df.collect() === Array(Row(1), Row(2))) } } + + test("SPARK-21603 check there is a too long generated function") { + val ds = spark.range(10) + .selectExpr( + "id", + "(id & 1023) as k1", + "cast(id & 1023 as double) as k2", + "cast(id & 1023 as int) as k3", + "case when id > 100 and id <= 200 then 1 else 0 end as v1", + "case when id > 200 and id <= 300 then 1 else 0 end as v2", + "case when id > 300 and id <= 400 then 1 else 0 end as v3", + "case when id > 400 and id <= 500 then 1 else 0 end as v4", + "case when id > 500 and id <= 600 then 1 else 0 end as v5", + "case when id > 600 and id <= 700 then 1 else 0 end as v6", + "case when id > 700 and id <= 800 then 1 else 0 end as v7", + "case when id > 800 and id <= 900 then 1 else 0 end as v8", + "case when id > 900 and id <= 1000 then 1 else 0 end as v9", + "case when id > 1000 and id <= 1100 then 1 else 0 end as v10", + "case when id > 1100 and id <= 1200 then 1 else 0 end as v11", + "case when id > 1200 and id <= 1300 then 1 else 0 end as v12", + "case when id > 1300 and id <= 1400 then 1 else 0 end as v13", + "case when id > 1400 and id <= 1500 then 1 else 0 end as v14", + "case when id > 1500 and id <= 1600 then 1 else 0 end as v15", + "case when id > 1600 and id <= 1700 then 1 else 0 end as v16", + "case when id > 1700 and id <= 1800 then 1 else 0 end as v17", + "case when id > 1800 and id <= 1900 then 1 else 0 end as v18", + "case when id > 1900 and id <= 2000 then 1 else 0 end as v19", + "case when id > 2000 and id <= 2100 then 1 else 0 end as v20", + "case when id > 2100 and id <= 2200 then 1 else 0 end as v21", + "case when id > 2200 and id <= 2300 then 1 else 0 end as v22", + "case when id > 2300 and id <= 2400 then 1 else 0 end as v23", + "case when id > 2400 and id <= 2500 then 1 else 0 end as v24", + "case when id > 2500 and id <= 2600 then 1 else 0 end as v25", + "case when id > 2600 and id <= 2700 then 1 else 0 end as v26") + .groupBy("k1", "k2", "k3") + .sum() + val plan = ds.queryExecution.executedPlan + val wholeStageCodegenExec = plan.find(p => + p.isInstanceOf[WholeStageCodegenExec] && + p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec] && + p.asInstanceOf[WholeStageCodegenExec].child.asInstanceOf[HashAggregateExec] + .child.isInstanceOf[ProjectExec] + ) + assert(wholeStageCodegenExec.isDefined) + val (ctx, cleanedSource) = + wholeStageCodegenExec.get.asInstanceOf[WholeStageCodegenExec].doCodeGen() + assert(ctx.isTooLongGeneratedFunction === true) + } + + test("SPARK-21603 check there is not a too long generated function") { + val ds = spark.range(10) + .selectExpr( + "id", + "(id & 1023) as k1", + "cast(id & 1023 as double) as k2", + "cast(id & 1023 as int) as k3", + "case when id > 100 and id <= 200 then 1 else 0 end as v1") + .groupBy("k1", "k2", "k3") + .sum() + val plan = ds.queryExecution.executedPlan + val wholeStageCodegenExec = plan.find(p => + p.isInstanceOf[WholeStageCodegenExec] && + p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec] && + p.asInstanceOf[WholeStageCodegenExec].child.asInstanceOf[HashAggregateExec] + .child.isInstanceOf[ProjectExec] + ) + assert(wholeStageCodegenExec.isDefined) + val (ctx, cleanedSource) = + wholeStageCodegenExec.get.asInstanceOf[WholeStageCodegenExec].doCodeGen() + assert(ctx.isTooLongGeneratedFunction === false) + } } From 5c180ac71e9feba51b5681265c6834cd1963a793 Mon Sep 17 00:00:00 2001 From: 10129659 Date: Thu, 10 Aug 2017 20:31:59 +0800 Subject: [PATCH 14/20] Add test case for isTooLongGeneratedFunction --- .../apache/spark/sql/execution/WholeStageCodegenSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index 9857f1897cb55..1dca672cfd7cb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -193,7 +193,7 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { .child.isInstanceOf[ProjectExec] ) assert(wholeStageCodegenExec.isDefined) - val (ctx, cleanedSource) = + val (ctx, _) = wholeStageCodegenExec.get.asInstanceOf[WholeStageCodegenExec].doCodeGen() assert(ctx.isTooLongGeneratedFunction === true) } @@ -216,7 +216,7 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { .child.isInstanceOf[ProjectExec] ) assert(wholeStageCodegenExec.isDefined) - val (ctx, cleanedSource) = + val (ctx, _) = wholeStageCodegenExec.get.asInstanceOf[WholeStageCodegenExec].doCodeGen() assert(ctx.isTooLongGeneratedFunction === false) } From b83cd1c2e291e2ec429125933db7db4f7c41a11b Mon Sep 17 00:00:00 2001 From: 10129659 Date: Fri, 11 Aug 2017 10:40:50 +0800 Subject: [PATCH 15/20] Modified test case --- .../execution/WholeStageCodegenSuite.scala | 102 ++++++++---------- 1 file changed, 42 insertions(+), 60 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index 1dca672cfd7cb..62a26086565c9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.{Column, Dataset, Row} import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.{Add, Literal, Stack} +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext import org.apache.spark.sql.execution.aggregate.HashAggregateExec import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec import org.apache.spark.sql.execution.joins.SortMergeJoinExec @@ -150,74 +151,55 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { } } - test("SPARK-21603 check there is a too long generated function") { + def genGroupByCodeGenContext(caseNum: Int, maxLinesPerFunction: Int): CodegenContext = { + val caseExp = (1 to caseNum).map { i => + s"case when id > $i and id <= ${i+1} then 1 else 0 end as v$i" + }.toList + + spark.conf.set("spark.sql.codegen.maxLinesPerFunction", maxLinesPerFunction) + + val keyExp = List( + "id", + "(id & 1023) as k1", + "cast(id & 1023 as double) as k2", + "cast(id & 1023 as int) as k3" + ) + val ds = spark.range(10) - .selectExpr( - "id", - "(id & 1023) as k1", - "cast(id & 1023 as double) as k2", - "cast(id & 1023 as int) as k3", - "case when id > 100 and id <= 200 then 1 else 0 end as v1", - "case when id > 200 and id <= 300 then 1 else 0 end as v2", - "case when id > 300 and id <= 400 then 1 else 0 end as v3", - "case when id > 400 and id <= 500 then 1 else 0 end as v4", - "case when id > 500 and id <= 600 then 1 else 0 end as v5", - "case when id > 600 and id <= 700 then 1 else 0 end as v6", - "case when id > 700 and id <= 800 then 1 else 0 end as v7", - "case when id > 800 and id <= 900 then 1 else 0 end as v8", - "case when id > 900 and id <= 1000 then 1 else 0 end as v9", - "case when id > 1000 and id <= 1100 then 1 else 0 end as v10", - "case when id > 1100 and id <= 1200 then 1 else 0 end as v11", - "case when id > 1200 and id <= 1300 then 1 else 0 end as v12", - "case when id > 1300 and id <= 1400 then 1 else 0 end as v13", - "case when id > 1400 and id <= 1500 then 1 else 0 end as v14", - "case when id > 1500 and id <= 1600 then 1 else 0 end as v15", - "case when id > 1600 and id <= 1700 then 1 else 0 end as v16", - "case when id > 1700 and id <= 1800 then 1 else 0 end as v17", - "case when id > 1800 and id <= 1900 then 1 else 0 end as v18", - "case when id > 1900 and id <= 2000 then 1 else 0 end as v19", - "case when id > 2000 and id <= 2100 then 1 else 0 end as v20", - "case when id > 2100 and id <= 2200 then 1 else 0 end as v21", - "case when id > 2200 and id <= 2300 then 1 else 0 end as v22", - "case when id > 2300 and id <= 2400 then 1 else 0 end as v23", - "case when id > 2400 and id <= 2500 then 1 else 0 end as v24", - "case when id > 2500 and id <= 2600 then 1 else 0 end as v25", - "case when id > 2600 and id <= 2700 then 1 else 0 end as v26") + .selectExpr(keyExp:::caseExp: _*) .groupBy("k1", "k2", "k3") .sum() val plan = ds.queryExecution.executedPlan - val wholeStageCodegenExec = plan.find(p => - p.isInstanceOf[WholeStageCodegenExec] && - p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec] && - p.asInstanceOf[WholeStageCodegenExec].child.asInstanceOf[HashAggregateExec] - .child.isInstanceOf[ProjectExec] - ) - assert(wholeStageCodegenExec.isDefined) - val (ctx, _) = - wholeStageCodegenExec.get.asInstanceOf[WholeStageCodegenExec].doCodeGen() + + val wholeStageCodeGenExec = plan.find(p => p match { + case wp: WholeStageCodegenExec => wp.child match { + case hp: HashAggregateExec if (hp.child.isInstanceOf[ProjectExec]) => true + case _ => false + } + case _ => false + }) + + assert(wholeStageCodeGenExec.isDefined) + wholeStageCodeGenExec.get.asInstanceOf[WholeStageCodegenExec].doCodeGen()._1 + } + + test("SPARK-21603 check there is a too long generated function") { + val ctx = genGroupByCodeGenContext(30, 1500) assert(ctx.isTooLongGeneratedFunction === true) } test("SPARK-21603 check there is not a too long generated function") { - val ds = spark.range(10) - .selectExpr( - "id", - "(id & 1023) as k1", - "cast(id & 1023 as double) as k2", - "cast(id & 1023 as int) as k3", - "case when id > 100 and id <= 200 then 1 else 0 end as v1") - .groupBy("k1", "k2", "k3") - .sum() - val plan = ds.queryExecution.executedPlan - val wholeStageCodegenExec = plan.find(p => - p.isInstanceOf[WholeStageCodegenExec] && - p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec] && - p.asInstanceOf[WholeStageCodegenExec].child.asInstanceOf[HashAggregateExec] - .child.isInstanceOf[ProjectExec] - ) - assert(wholeStageCodegenExec.isDefined) - val (ctx, _) = - wholeStageCodegenExec.get.asInstanceOf[WholeStageCodegenExec].doCodeGen() + val ctx = genGroupByCodeGenContext(1, 1500) assert(ctx.isTooLongGeneratedFunction === false) } + + test("SPARK-21603 check there is not a too long generated function when threshold is Int.Max") { + val ctx = genGroupByCodeGenContext(30, Int.MaxValue) + assert(ctx.isTooLongGeneratedFunction === false) + } + + test("SPARK-21603 check there is not a too long generated function when threshold is 0") { + val ctx = genGroupByCodeGenContext(1, 0) + assert(ctx.isTooLongGeneratedFunction === true) + } } From 6814047231044627ee85aef458c108345e129bee Mon Sep 17 00:00:00 2001 From: 10129659 Date: Fri, 11 Aug 2017 10:54:36 +0800 Subject: [PATCH 16/20] Modified test case --- .../org/apache/spark/sql/execution/WholeStageCodegenSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index 62a26086565c9..85133452a913c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -153,7 +153,7 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { def genGroupByCodeGenContext(caseNum: Int, maxLinesPerFunction: Int): CodegenContext = { val caseExp = (1 to caseNum).map { i => - s"case when id > $i and id <= ${i+1} then 1 else 0 end as v$i" + s"case when id > $i and id <= ${i + 1} then 1 else 0 end as v$i" }.toList spark.conf.set("spark.sql.codegen.maxLinesPerFunction", maxLinesPerFunction) From 8b32b54d0586b8878ea231919266e429f613e8c7 Mon Sep 17 00:00:00 2001 From: 10129659 Date: Fri, 11 Aug 2017 12:41:07 +0800 Subject: [PATCH 17/20] Using withSQLConf --- .../execution/WholeStageCodegenSuite.scala | 31 +++++++++++-------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index 85133452a913c..fec37eb9080aa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -151,13 +151,10 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { } } - def genGroupByCodeGenContext(caseNum: Int, maxLinesPerFunction: Int): CodegenContext = { + def genGroupByCodeGenContext(caseNum: Int): CodegenContext = { val caseExp = (1 to caseNum).map { i => s"case when id > $i and id <= ${i + 1} then 1 else 0 end as v$i" }.toList - - spark.conf.set("spark.sql.codegen.maxLinesPerFunction", maxLinesPerFunction) - val keyExp = List( "id", "(id & 1023) as k1", @@ -184,22 +181,30 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { } test("SPARK-21603 check there is a too long generated function") { - val ctx = genGroupByCodeGenContext(30, 1500) - assert(ctx.isTooLongGeneratedFunction === true) + withSQLConf(SQLConf.WHOLESTAGE_MAX_LINES_PER_FUNCTION.key -> "1500") { + val ctx = genGroupByCodeGenContext(30) + assert(ctx.isTooLongGeneratedFunction === true) + } } test("SPARK-21603 check there is not a too long generated function") { - val ctx = genGroupByCodeGenContext(1, 1500) - assert(ctx.isTooLongGeneratedFunction === false) + withSQLConf(SQLConf.WHOLESTAGE_MAX_LINES_PER_FUNCTION.key -> "1500") { + val ctx = genGroupByCodeGenContext(1) + assert(ctx.isTooLongGeneratedFunction === false) + } } test("SPARK-21603 check there is not a too long generated function when threshold is Int.Max") { - val ctx = genGroupByCodeGenContext(30, Int.MaxValue) - assert(ctx.isTooLongGeneratedFunction === false) + withSQLConf(SQLConf.WHOLESTAGE_MAX_LINES_PER_FUNCTION.key -> Int.MaxValue.toString) { + val ctx = genGroupByCodeGenContext(30) + assert(ctx.isTooLongGeneratedFunction === false) + } } - test("SPARK-21603 check there is not a too long generated function when threshold is 0") { - val ctx = genGroupByCodeGenContext(1, 0) - assert(ctx.isTooLongGeneratedFunction === true) + test("SPARK-21603 check there is a too long generated function when threshold is 0") { + withSQLConf(SQLConf.WHOLESTAGE_MAX_LINES_PER_FUNCTION.key -> "0") { + val ctx = genGroupByCodeGenContext(1) + assert(ctx.isTooLongGeneratedFunction === true) + } } } From 32813b095760600ac79b8a466d892a24090e34e8 Mon Sep 17 00:00:00 2001 From: 10129659 Date: Fri, 11 Aug 2017 14:48:18 +0800 Subject: [PATCH 18/20] Using withSQLConf --- .../apache/spark/sql/execution/WholeStageCodegenSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index fec37eb9080aa..beeee6a97c8dd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -159,8 +159,7 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { "id", "(id & 1023) as k1", "cast(id & 1023 as double) as k2", - "cast(id & 1023 as int) as k3" - ) + "cast(id & 1023 as int) as k3") val ds = spark.range(10) .selectExpr(keyExp:::caseExp: _*) From b879dbf3eb69f7ad40a8405acd92d11212bcb3b2 Mon Sep 17 00:00:00 2001 From: 10129659 Date: Sat, 12 Aug 2017 09:16:18 +0800 Subject: [PATCH 19/20] Modified the default value to Int.Max --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index d2bf413ec3d16..ba07bc49cb69f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -578,7 +578,7 @@ object SQLConf { "When the generated function exceeds this threshold, " + "the whole-stage codegen is deactivated for this subtree of the current query plan.") .intConf - .createWithDefault(1500) + .createWithDefault(Int.MaxValue) val FILES_MAX_PARTITION_BYTES = buildConf("spark.sql.files.maxPartitionBytes") .doc("The maximum number of bytes to pack into a single partition when reading files.") From 44ce894fdc311febbac04fb70448c0081d0f4253 Mon Sep 17 00:00:00 2001 From: 10129659 Date: Sat, 12 Aug 2017 09:58:47 +0800 Subject: [PATCH 20/20] Modified the default value to 8000/3 --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index ba07bc49cb69f..a0b8364e91c92 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -576,9 +576,11 @@ object SQLConf { .internal() .doc("The maximum lines of a single Java function generated by whole-stage codegen. " + "When the generated function exceeds this threshold, " + - "the whole-stage codegen is deactivated for this subtree of the current query plan.") + "the whole-stage codegen is deactivated for this subtree of the current query plan. " + + "The default value 2667 is the max length of byte code JIT supported " + + "for a single function(8000) divided by 3.") .intConf - .createWithDefault(Int.MaxValue) + .createWithDefault(2667) val FILES_MAX_PARTITION_BYTES = buildConf("spark.sql.files.maxPartitionBytes") .doc("The maximum number of bytes to pack into a single partition when reading files.")