-
Notifications
You must be signed in to change notification settings - Fork 28.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[SPARK-27097][CHERRY-PICK 2.4] Avoid embedding platform-dependent off…
…sets literally in whole-stage generated code ## What changes were proposed in this pull request? Spark SQL performs whole-stage code generation to speed up query execution. There are two steps to it: - Java source code is generated from the physical query plan on the driver. A single version of the source code is generated from a query plan, and sent to all executors. - It's compiled to bytecode on the driver to catch compilation errors before sending to executors, but currently only the generated source code gets sent to the executors. The bytecode compilation is for fail-fast only. - Executors receive the generated source code and compile to bytecode, then the query runs like a hand-written Java program. In this model, there's an implicit assumption about the driver and executors being run on similar platforms. Some code paths accidentally embedded platform-dependent object layout information into the generated code, such as: ```java Platform.putLong(buffer, /* offset */ 24, /* value */ 1); ``` This code expects a field to be at offset +24 of the `buffer` object, and sets a value to that field. But whole-stage code generation generally uses platform-dependent information from the driver. If the object layout is significantly different on the driver and executors, the generated code can be reading/writing to wrong offsets on the executors, causing all kinds of data corruption. One code pattern that leads to such problem is the use of `Platform.XXX` constants in generated code, e.g. `Platform.BYTE_ARRAY_OFFSET`. Bad: ```scala val baseOffset = Platform.BYTE_ARRAY_OFFSET // codegen template: s"Platform.putLong($buffer, $baseOffset, $value);" ``` This will embed the value of `Platform.BYTE_ARRAY_OFFSET` on the driver into the generated code. Good: ```scala val baseOffset = "Platform.BYTE_ARRAY_OFFSET" // codegen template: s"Platform.putLong($buffer, $baseOffset, $value);" ``` This will generate the offset symbolically -- `Platform.putLong(buffer, Platform.BYTE_ARRAY_OFFSET, value)`, which will be able to pick up the correct value on the executors. Caveat: these offset constants are declared as runtime-initialized `static final` in Java, so they're not compile-time constants from the Java language's perspective. It does lead to a slightly increased size of the generated code, but this is necessary for correctness. NOTE: there can be other patterns that generate platform-dependent code on the driver which is invalid on the executors. e.g. if the endianness is different between the driver and the executors, and if some generated code makes strong assumption about endianness, it would also be problematic. ## How was this patch tested? Added a new test suite `WholeStageCodegenSparkSubmitSuite`. This test suite needs to set the driver's extraJavaOptions to force the driver and executor use different Java object layouts, so it's run as an actual SparkSubmit job. Authored-by: Kris Mok <kris.mokdatabricks.com> Closes #24032 from gatorsmile/testFailure. Lead-authored-by: Kris Mok <[email protected]> Co-authored-by: gatorsmile <[email protected]> Signed-off-by: DB Tsai <[email protected]>
- Loading branch information
Showing
5 changed files
with
108 additions
and
16 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
93 changes: 93 additions & 0 deletions
93
...ore/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSparkSubmitSuite.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.execution | ||
|
||
import org.scalatest.{Assertions, BeforeAndAfterEach, Matchers} | ||
import org.scalatest.concurrent.TimeLimits | ||
|
||
import org.apache.spark.{SparkFunSuite, TestUtils} | ||
import org.apache.spark.deploy.SparkSubmitSuite | ||
import org.apache.spark.internal.Logging | ||
import org.apache.spark.sql.{LocalSparkSession, QueryTest, Row, SparkSession} | ||
import org.apache.spark.sql.functions.{array, col, count, lit} | ||
import org.apache.spark.sql.types.IntegerType | ||
import org.apache.spark.unsafe.Platform | ||
import org.apache.spark.util.ResetSystemProperties | ||
|
||
// Due to the need to set driver's extraJavaOptions, this test needs to use actual SparkSubmit. | ||
class WholeStageCodegenSparkSubmitSuite extends SparkFunSuite | ||
with Matchers | ||
with BeforeAndAfterEach | ||
with ResetSystemProperties { | ||
|
||
test("Generated code on driver should not embed platform-specific constant") { | ||
val unusedJar = TestUtils.createJarWithClasses(Seq.empty) | ||
|
||
// HotSpot JVM specific: Set up a local cluster with the driver/executor using mismatched | ||
// settings of UseCompressedOops JVM option. | ||
val argsForSparkSubmit = Seq( | ||
"--class", WholeStageCodegenSparkSubmitSuite.getClass.getName.stripSuffix("$"), | ||
"--master", "local-cluster[1,1,1024]", | ||
"--driver-memory", "1g", | ||
"--conf", "spark.ui.enabled=false", | ||
"--conf", "spark.master.rest.enabled=false", | ||
"--conf", "spark.driver.extraJavaOptions=-XX:-UseCompressedOops", | ||
"--conf", "spark.executor.extraJavaOptions=-XX:+UseCompressedOops", | ||
unusedJar.toString) | ||
SparkSubmitSuite.runSparkSubmit(argsForSparkSubmit, "../..") | ||
} | ||
} | ||
|
||
object WholeStageCodegenSparkSubmitSuite extends Assertions with Logging { | ||
|
||
var spark: SparkSession = _ | ||
|
||
def main(args: Array[String]): Unit = { | ||
TestUtils.configTestLog4j("INFO") | ||
|
||
spark = SparkSession.builder().getOrCreate() | ||
|
||
// Make sure the test is run where the driver and the executors uses different object layouts | ||
val driverArrayHeaderSize = Platform.BYTE_ARRAY_OFFSET | ||
val executorArrayHeaderSize = | ||
spark.sparkContext.range(0, 1).map(_ => Platform.BYTE_ARRAY_OFFSET).collect.head.toInt | ||
assert(driverArrayHeaderSize > executorArrayHeaderSize) | ||
|
||
val df = spark.range(71773).select((col("id") % lit(10)).cast(IntegerType) as "v") | ||
.groupBy(array(col("v"))).agg(count(col("*"))) | ||
val plan = df.queryExecution.executedPlan | ||
assert(plan.find(_.isInstanceOf[WholeStageCodegenExec]).isDefined) | ||
|
||
val expectedAnswer = | ||
Row(Array(0), 7178) :: | ||
Row(Array(1), 7178) :: | ||
Row(Array(2), 7178) :: | ||
Row(Array(3), 7177) :: | ||
Row(Array(4), 7177) :: | ||
Row(Array(5), 7177) :: | ||
Row(Array(6), 7177) :: | ||
Row(Array(7), 7177) :: | ||
Row(Array(8), 7177) :: | ||
Row(Array(9), 7177) :: Nil | ||
val result = df.collect | ||
QueryTest.sameRows(result.toSeq, expectedAnswer) match { | ||
case Some(errMsg) => fail(errMsg) | ||
case _ => | ||
} | ||
} | ||
} |