Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21603][SQL]The wholestage codegen will be much slower then that is closed when the function is too long #18810

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ object CodeFormatter {
}
new CodeAndComment(code.result().trim(), map)
}

def stripExtraNewLinesAndComments(input: String): String = {
val commentReg =
("""([ |\t]*?\/\*[\s|\S]*?\*\/[ |\t]*?)|""" + // strip /*comment*/
"""([ |\t]*?\/\/[\s\S]*?\n)""").r // strip //comment
val codeWithoutComment = commentReg.replaceAllIn(input, "")
codeWithoutComment.replaceAll("""\n\s*\n""", "\n") // strip ExtraNewLines
}
}

private class CodeFormatter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,20 @@ class CodegenContext {
*/
private val placeHolderToComments = new mutable.HashMap[String, String]

/**
* It will count the lines of every Java function generated by whole-stage codegen,
* if there is a function of length greater than spark.sql.codegen.maxLinesPerFunction,
* it will return true.
*/
def isTooLongGeneratedFunction: Boolean = {
classFunctions.values.exists { _.values.exists {
code =>
val codeWithoutComments = CodeFormatter.stripExtraNewLinesAndComments(code)
codeWithoutComments.count(_ == '\n') > SQLConf.get.maxLinesPerFunction
}
}
}

/**
Copy link
Member

@gatorsmile gatorsmile Aug 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add one more empty line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, added, thanks

* Returns a term name that is unique within this instance of a `CodegenContext`.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,14 @@ object SQLConf {
"disable logging or -1 to apply no limit.")
.createWithDefault(1000)

val WHOLESTAGE_MAX_LINES_PER_FUNCTION = buildConf("spark.sql.codegen.maxLinesPerFunction")
.internal()
.doc("The maximum lines of a single Java function generated by whole-stage codegen. " +
"When the generated function exceeds this threshold, " +
"the whole-stage codegen is deactivated for this subtree of the current query plan.")
.intConf
.createWithDefault(1500)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to explain why 1500 is the good value as default?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not confident about this default value. Is it too small?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to not change current behavior of whole-stage codegen. This might suddenly let user codes not run in whole-stage codegen unintentionally. Shall we make -1 as default and skip function length check if this config is negative?

Copy link
Contributor Author

@eatoncys eatoncys Aug 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I modified it to 1600, the result is:

default

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this value (1500) effective for other huge programs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it applies to other Java programs using JAVA HotSpot VM.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybeInt.Max as default value so we won't change current behavior.

Copy link
Contributor Author

@eatoncys eatoncys Aug 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile, Which do you think is better to use for the default value, 1500 or Int.Max ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this value depends on what code is generated by the whole-stage codegen for each query. In other words, when Java byte code per line is larger than 6 (= 8192 / 1500) in a generated Java code, 1500 would not work.
@eatoncys why are you sure that this can apply to other Java programs?

Copy link
Member

@gatorsmile gatorsmile Aug 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eatoncys Let us do it in a more conservative way. 8192 / 3 at first?

Copy link
Contributor Author

@eatoncys eatoncys Aug 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kiszk, you're right, it depends on how much byte code per line.
@gatorsmile, ok, we take a conservative value 2667 (8000/ 3) first, the HugeMethodLimit is 8000, not 8192.
http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/share/vm/runtime/globals.hpp
default


val FILES_MAX_PARTITION_BYTES = buildConf("spark.sql.files.maxPartitionBytes")
.doc("The maximum number of bytes to pack into a single partition when reading files.")
.longConf
Expand Down Expand Up @@ -1014,6 +1022,8 @@ class SQLConf extends Serializable with Logging {

def loggingMaxLinesForCodegen: Int = getConf(CODEGEN_LOGGING_MAX_LINES)

def maxLinesPerFunction: Int = getConf(WHOLESTAGE_MAX_LINES_PER_FUNCTION)

def tableRelationCacheSize: Int =
getConf(StaticSQLConf.FILESOURCE_TABLE_RELATION_CACHE_SIZE)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,38 @@ class CodeFormatterSuite extends SparkFunSuite {
assert(reducedCode.body === "/*project_c4*/")
}

test("removing extra new lines and comments") {
val code =
"""
|/*
| * multi
| * line
| * comments
| */
|
|public function() {
|/*comment*/
| /*comment_with_space*/
|code_body
|//comment
|code_body
| //comment_with_space
|
|code_body
|}
""".stripMargin

val reducedCode = CodeFormatter.stripExtraNewLinesAndComments(code)
assert(reducedCode ===
"""
|public function() {
|code_body
|code_body
|code_body
|}
""".stripMargin)
}

testCase("basic example") {
"""
|class A {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,14 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co

override def doExecute(): RDD[InternalRow] = {
val (ctx, cleanedSource) = doCodeGen()
if (ctx.isTooLongGeneratedFunction) {
logWarning("Found too long generated codes and JIT optimization might not work, " +
"Whole-stage codegen disabled for this plan, " +
"You can change the config spark.sql.codegen.MaxFunctionLength " +
"to adjust the function length limit:\n "
+ s"$treeString")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be very big. Please follow what did in #18658

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile , thank you for review, the treeString not contains the code, it only contains the tree string of the Physical plan like below:
*HashAggregate(keys=[k1#2395L, k2#2396, k3#2397], functions=[partial_sum(id#2392L)...
+- *Project [id#2392L, (id#2392L & 1023) AS k1#2395L, cast((id#2392L & 1023) as double) AS k2#2396...
+- *Range (0, 655360, step=1, splits=1)
So, I think it will not be very big.

return child.execute()
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to add a test in which we create a query with long generated function, and check if whole-stage codegen is disabled for it.

Copy link
Contributor Author

@eatoncys eatoncys Aug 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it can be tested by " max function length of wholestagecodegen" added in AggregateBenchmark.scala, thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AggregateBenchmark is more like a benchmark than a test. It won't run every time. We need a test to prevent regression brought by future change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya, it is hard to check if whole-stage codegen is disabled or not for me, would you like to give me some suggestion, thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can check if there is a WholeStageCodegenExec node in the physical plan of the query. WholeStageCodegenSuite has few examples you can take a look.

Copy link
Member

@viirya viirya Aug 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I'll take a look later. Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are multiple ways to verify it. One of the solution is using SQL metrics.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine about your proposed way, but needs to simplify it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile Do you mean pipelineTime metric?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

// try to compile and fallback if it failed
try {
CodeGenerator.compile(cleanedSource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.sql.{Column, Dataset, Row}
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{Add, Literal, Stack}
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
Expand Down Expand Up @@ -149,4 +150,60 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext {
assert(df.collect() === Array(Row(1), Row(2)))
}
}

def genGroupByCodeGenContext(caseNum: Int): CodegenContext = {
val caseExp = (1 to caseNum).map { i =>
s"case when id > $i and id <= ${i + 1} then 1 else 0 end as v$i"
}.toList
val keyExp = List(
"id",
"(id & 1023) as k1",
"cast(id & 1023 as double) as k2",
"cast(id & 1023 as int) as k3")

val ds = spark.range(10)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add withSQLConf to verify the newly added SQLConf effectiveness?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I have modified it as you suggested above all, would you like to review it again, thanks.

.selectExpr(keyExp:::caseExp: _*)
.groupBy("k1", "k2", "k3")
.sum()
val plan = ds.queryExecution.executedPlan

val wholeStageCodeGenExec = plan.find(p => p match {
case wp: WholeStageCodegenExec => wp.child match {
case hp: HashAggregateExec if (hp.child.isInstanceOf[ProjectExec]) => true
case _ => false
}
case _ => false
})

assert(wholeStageCodeGenExec.isDefined)
wholeStageCodeGenExec.get.asInstanceOf[WholeStageCodegenExec].doCodeGen()._1
}

test("SPARK-21603 check there is a too long generated function") {
withSQLConf(SQLConf.WHOLESTAGE_MAX_LINES_PER_FUNCTION.key -> "1500") {
val ctx = genGroupByCodeGenContext(30)
assert(ctx.isTooLongGeneratedFunction === true)
}
}

test("SPARK-21603 check there is not a too long generated function") {
withSQLConf(SQLConf.WHOLESTAGE_MAX_LINES_PER_FUNCTION.key -> "1500") {
val ctx = genGroupByCodeGenContext(1)
assert(ctx.isTooLongGeneratedFunction === false)
}
}

test("SPARK-21603 check there is not a too long generated function when threshold is Int.Max") {
withSQLConf(SQLConf.WHOLESTAGE_MAX_LINES_PER_FUNCTION.key -> Int.MaxValue.toString) {
val ctx = genGroupByCodeGenContext(30)
assert(ctx.isTooLongGeneratedFunction === false)
}
}

test("SPARK-21603 check there is a too long generated function when threshold is 0") {
withSQLConf(SQLConf.WHOLESTAGE_MAX_LINES_PER_FUNCTION.key -> "0") {
val ctx = genGroupByCodeGenContext(1)
assert(ctx.isTooLongGeneratedFunction === true)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,68 @@ class AggregateBenchmark extends BenchmarkBase {
*/
}

ignore("max function length of wholestagecodegen") {
val N = 20 << 15

val benchmark = new Benchmark("max function length of wholestagecodegen", N)
def f(): Unit = sparkSession.range(N)
.selectExpr(
"id",
"(id & 1023) as k1",
"cast(id & 1023 as double) as k2",
"cast(id & 1023 as int) as k3",
"case when id > 100 and id <= 200 then 1 else 0 end as v1",
"case when id > 200 and id <= 300 then 1 else 0 end as v2",
"case when id > 300 and id <= 400 then 1 else 0 end as v3",
"case when id > 400 and id <= 500 then 1 else 0 end as v4",
"case when id > 500 and id <= 600 then 1 else 0 end as v5",
"case when id > 600 and id <= 700 then 1 else 0 end as v6",
"case when id > 700 and id <= 800 then 1 else 0 end as v7",
"case when id > 800 and id <= 900 then 1 else 0 end as v8",
"case when id > 900 and id <= 1000 then 1 else 0 end as v9",
"case when id > 1000 and id <= 1100 then 1 else 0 end as v10",
"case when id > 1100 and id <= 1200 then 1 else 0 end as v11",
"case when id > 1200 and id <= 1300 then 1 else 0 end as v12",
"case when id > 1300 and id <= 1400 then 1 else 0 end as v13",
"case when id > 1400 and id <= 1500 then 1 else 0 end as v14",
"case when id > 1500 and id <= 1600 then 1 else 0 end as v15",
"case when id > 1600 and id <= 1700 then 1 else 0 end as v16",
"case when id > 1700 and id <= 1800 then 1 else 0 end as v17",
"case when id > 1800 and id <= 1900 then 1 else 0 end as v18")
.groupBy("k1", "k2", "k3")
.sum()
.collect()

benchmark.addCase(s"codegen = F") { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", "false")
f()
}

benchmark.addCase(s"codegen = T maxLinesPerFunction = 10000") { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
sparkSession.conf.set("spark.sql.codegen.maxLinesPerFunction", "10000")
f()
}

benchmark.addCase(s"codegen = T maxLinesPerFunction = 1500") { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
sparkSession.conf.set("spark.sql.codegen.maxLinesPerFunction", "1500")
f()
}

benchmark.run()

/*
Java HotSpot(TM) 64-Bit Server VM 1.8.0_111-b14 on Windows 7 6.1
Intel64 Family 6 Model 58 Stepping 9, GenuineIntel
max function length of wholestagecodegen: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
----------------------------------------------------------------------------------------------
codegen = F 462 / 533 1.4 704.4 1.0X
codegen = T maxLinesPerFunction = 10000 3444 / 3447 0.2 5255.3 0.1X
codegen = T maxLinesPerFunction = 1500 447 / 478 1.5 682.1 1.0X
*/
}


ignore("cube") {
val N = 5 << 20
Expand Down