Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into nan
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Jul 19, 2015
2 parents 88bd73c + 6cb6096 commit 983d4fc
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 523 deletions.
3 changes: 3 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ object MimaExcludes {
excludePackage("org.apache.spark.sql.execution"),
// Parquet support is considered private.
excludePackage("org.apache.spark.sql.parquet"),
// The old JSON RDD is removed in favor of streaming Jackson
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.json.JsonRDD$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.json.JsonRDD"),
// local function inside a method
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.sql.SQLContext.org$apache$spark$sql$SQLContext$$needsConversion$1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.expressions.codegen

import org.apache.spark.sql.catalyst.expressions._

import scala.collection.mutable.ArrayBuffer

// MutableProjection is not accessible in Java
abstract class BaseMutableProjection extends MutableProjection

Expand All @@ -45,10 +47,41 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu
else
${ctx.setColumn("mutableRow", e.dataType, i, evaluationCode.primitive)};
"""
}.mkString("\n")
}
// collect projections into blocks as function has 64kb codesize limit in JVM
val projectionBlocks = new ArrayBuffer[String]()
val blockBuilder = new StringBuilder()
for (projection <- projectionCode) {
if (blockBuilder.length > 16 * 1000) {
projectionBlocks.append(blockBuilder.toString())
blockBuilder.clear()
}
blockBuilder.append(projection)
}
projectionBlocks.append(blockBuilder.toString())

val (projectionFuns, projectionCalls) = {
// inline execution if codesize limit was not broken
if (projectionBlocks.length == 1) {
("", projectionBlocks.head)
} else {
(
projectionBlocks.zipWithIndex.map { case (body, i) =>
s"""
|private void apply$i(InternalRow i) {
| $body
|}
""".stripMargin
}.mkString,
projectionBlocks.indices.map(i => s"apply$i(i);").mkString("\n")
)
}
}

val mutableStates = ctx.mutableStates.map { case (javaType, variableName, initialValue) =>
s"private $javaType $variableName = $initialValue;"
}.mkString("\n ")

val code = s"""
public Object generate($exprType[] expr) {
return new SpecificProjection(expr);
Expand All @@ -75,9 +108,11 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu
return (InternalRow) mutableRow;
}

$projectionFuns

public Object apply(Object _i) {
InternalRow i = (InternalRow) _i;
$projectionCode
$projectionCalls

return mutableRow;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.types.{DataTypeTestUtils, NullType, StructField, Str
/**
* Additional tests for code generation.
*/
class CodeGenerationSuite extends SparkFunSuite {
class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {

test("multithreaded eval") {
import scala.concurrent._
Expand All @@ -56,10 +56,10 @@ class CodeGenerationSuite extends SparkFunSuite {
val rowOrdering = RowOrdering.forSchema(Seq(dataType, dataType))
val genOrdering = GenerateOrdering.generate(
BoundReference(0, dataType, nullable = true).asc ::
BoundReference(1, dataType, nullable = true).asc :: Nil)
BoundReference(1, dataType, nullable = true).asc :: Nil)
val rowType = StructType(
StructField("a", dataType, nullable = true) ::
StructField("b", dataType, nullable = true) :: Nil)
StructField("b", dataType, nullable = true) :: Nil)
val maybeDataGenerator = RandomDataGenerator.forType(rowType, nullable = false)
assume(maybeDataGenerator.isDefined)
val randGenerator = maybeDataGenerator.get
Expand All @@ -81,4 +81,16 @@ class CodeGenerationSuite extends SparkFunSuite {
}
}
}

test("SPARK-8443: split wide projections into blocks due to JVM code size limit") {
val length = 5000
val expressions = List.fill(length)(EqualTo(Literal(1), Literal(1)))
val plan = GenerateMutableProjection.generate(expressions)()
val actual = plan(new GenericMutableRow(length)).toSeq
val expected = Seq.fill(length)(true)

if (!checkResult(actual, expected)) {
fail(s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected")
}
}
}
15 changes: 3 additions & 12 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.api.java.JavaRDD
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.json.{JsonRDD, JSONRelation}
import org.apache.spark.sql.json.JSONRelation
import org.apache.spark.sql.parquet.ParquetRelation2
import org.apache.spark.sql.sources.{LogicalRelation, ResolvedDataSource}
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -236,17 +236,8 @@ class DataFrameReader private[sql](sqlContext: SQLContext) {
*/
def json(jsonRDD: RDD[String]): DataFrame = {
val samplingRatio = extraOptions.getOrElse("samplingRatio", "1.0").toDouble
if (sqlContext.conf.useJacksonStreamingAPI) {
sqlContext.baseRelationToDataFrame(
new JSONRelation(() => jsonRDD, None, samplingRatio, userSpecifiedSchema)(sqlContext))
} else {
val columnNameOfCorruptJsonRecord = sqlContext.conf.columnNameOfCorruptRecord
val appliedSchema = userSpecifiedSchema.getOrElse(
JsonRDD.nullTypeToStringType(
JsonRDD.inferSchema(jsonRDD, 1.0, columnNameOfCorruptJsonRecord)))
val rowRDD = JsonRDD.jsonStringToRow(jsonRDD, appliedSchema, columnNameOfCorruptJsonRecord)
sqlContext.internalCreateDataFrame(rowRDD, appliedSchema)
}
sqlContext.baseRelationToDataFrame(
new JSONRelation(() => jsonRDD, None, samplingRatio, userSpecifiedSchema)(sqlContext))
}

/**
Expand Down
5 changes: 0 additions & 5 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,6 @@ private[spark] object SQLConf {
"spark.sql.useSerializer2",
defaultValue = Some(true), isPublic = false)

val USE_JACKSON_STREAMING_API = booleanConf("spark.sql.json.useJacksonStreamingAPI",
defaultValue = Some(true), doc = "<TODO>")

object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
Expand Down Expand Up @@ -473,8 +470,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf {

private[spark] def useSqlSerializer2: Boolean = getConf(USE_SQL_SERIALIZER2)

private[spark] def useJacksonStreamingAPI: Boolean = getConf(USE_JACKSON_STREAMING_API)

private[spark] def autoBroadcastJoinThreshold: Int = getConf(AUTO_BROADCASTJOIN_THRESHOLD)

private[spark] def defaultSizeInBytes: Long =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,51 +157,27 @@ private[sql] class JSONRelation(
}
}

private val useJacksonStreamingAPI: Boolean = sqlContext.conf.useJacksonStreamingAPI

override val needConversion: Boolean = false

override lazy val schema = userSpecifiedSchema.getOrElse {
if (useJacksonStreamingAPI) {
InferSchema(
baseRDD(),
samplingRatio,
sqlContext.conf.columnNameOfCorruptRecord)
} else {
JsonRDD.nullTypeToStringType(
JsonRDD.inferSchema(
baseRDD(),
samplingRatio,
sqlContext.conf.columnNameOfCorruptRecord))
}
InferSchema(
baseRDD(),
samplingRatio,
sqlContext.conf.columnNameOfCorruptRecord)
}

override def buildScan(): RDD[Row] = {
if (useJacksonStreamingAPI) {
JacksonParser(
baseRDD(),
schema,
sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row])
} else {
JsonRDD.jsonStringToRow(
baseRDD(),
schema,
sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row])
}
JacksonParser(
baseRDD(),
schema,
sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row])
}

override def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row] = {
if (useJacksonStreamingAPI) {
JacksonParser(
baseRDD(),
StructType.fromAttributes(requiredColumns),
sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row])
} else {
JsonRDD.jsonStringToRow(
baseRDD(),
StructType.fromAttributes(requiredColumns),
sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row])
}
JacksonParser(
baseRDD(),
StructType.fromAttributes(requiredColumns),
sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row])
}

override def insert(data: DataFrame, overwrite: Boolean): Unit = {
Expand Down
Loading

0 comments on commit 983d4fc

Please sign in to comment.