Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7718] [SQL] Speed up partitioning by avoiding closure cleaning #6256

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2201,6 +2201,24 @@ private[spark] object Utils extends Logging {
shutdownHooks.remove(ref)
}

/**
* To avoid calling `Utils.getCallSite` for every single RDD we create in the body,
* set a dummy call site that RDDs use instead. This is for performance optimization.
*/
def withDummyCallSite[T](sc: SparkContext)(body: => T): T = {
val oldShortCallSite = sc.getLocalProperty(CallSite.SHORT_FORM)
val oldLongCallSite = sc.getLocalProperty(CallSite.LONG_FORM)
try {
sc.setLocalProperty(CallSite.SHORT_FORM, "")
sc.setLocalProperty(CallSite.LONG_FORM, "")
body
} finally {
// Restore the old ones here
sc.setLocalProperty(CallSite.SHORT_FORM, oldShortCallSite)
sc.setLocalProperty(CallSite.LONG_FORM, oldLongCallSite)
}
}

}

private [util] class SparkShutdownHookManager {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ import parquet.hadoop._
import parquet.hadoop.metadata.CompressionCodecName
import parquet.hadoop.util.ContextUtil

import org.apache.spark.{Partition => SparkPartition, SerializableWritable, Logging, SparkException}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.{Row, SQLConf, SQLContext}
import org.apache.spark.{Partition => SparkPartition, SparkEnv, SerializableWritable, Logging, SparkException}
import org.apache.spark.util.Utils

private[sql] class DefaultSource extends HadoopFsRelationProvider {
override def createRelation(
Expand Down Expand Up @@ -252,57 +253,58 @@ private[sql] class ParquetRelation2(

val footers = inputFiles.map(f => metadataCache.footers(f.getPath))

// TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
// After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and
// footers. Especially when a global arbitrative schema (either from metastore or data source
// DDL) is available.
new SqlNewHadoopRDD(
sc = sqlContext.sparkContext,
broadcastedConf = broadcastedConf,
initDriverSideJobFuncOpt = Some(setInputPaths),
initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
inputFormatClass = classOf[FilteringParquetRowInputFormat],
keyClass = classOf[Void],
valueClass = classOf[Row]) {

val cacheMetadata = useMetadataCache

@transient val cachedStatuses = inputFiles.map { f =>
// In order to encode the authority of a Path containing special characters such as /,
// we need to use the string returned by the URI of the path to create a new Path.
val pathWithAuthority = new Path(f.getPath.toUri.toString)

new FileStatus(
f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime,
f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithAuthority)
}.toSeq

@transient val cachedFooters = footers.map { f =>
// In order to encode the authority of a Path containing special characters such as /,
// we need to use the string returned by the URI of the path to create a new Path.
new Footer(new Path(f.getFile.toUri.toString), f.getParquetMetadata)
}.toSeq

// Overridden so we can inject our own cached files statuses.
override def getPartitions: Array[SparkPartition] = {
val inputFormat = if (cacheMetadata) {
new FilteringParquetRowInputFormat {
override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses

override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
Utils.withDummyCallSite(sqlContext.sparkContext) {
// TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
// After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects
// and footers. Especially when a global arbitrative schema (either from metastore or data
// source DDL) is available.
new SqlNewHadoopRDD(
sc = sqlContext.sparkContext,
broadcastedConf = broadcastedConf,
initDriverSideJobFuncOpt = Some(setInputPaths),
initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
inputFormatClass = classOf[FilteringParquetRowInputFormat],
keyClass = classOf[Void],
valueClass = classOf[Row]) {

val cacheMetadata = useMetadataCache

@transient val cachedStatuses = inputFiles.map { f =>
// In order to encode the authority of a Path containing special characters such as /,
// we need to use the string returned by the URI of the path to create a new Path.
val pathWithAuthority = new Path(f.getPath.toUri.toString)

new FileStatus(
f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime,
f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithAuthority)
}.toSeq

@transient val cachedFooters = footers.map { f =>
// In order to encode the authority of a Path containing special characters such as /,
// we need to use the string returned by the URI of the path to create a new Path.
new Footer(new Path(f.getFile.toUri.toString), f.getParquetMetadata)
}.toSeq

// Overridden so we can inject our own cached files statuses.
override def getPartitions: Array[SparkPartition] = {
val inputFormat = if (cacheMetadata) {
new FilteringParquetRowInputFormat {
override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses
override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
}
} else {
new FilteringParquetRowInputFormat
}
} else {
new FilteringParquetRowInputFormat
}

val jobContext = newJobContext(getConf(isDriverSide = true), jobId)
val rawSplits = inputFormat.getSplits(jobContext)
val jobContext = newJobContext(getConf(isDriverSide = true), jobId)
val rawSplits = inputFormat.getSplits(jobContext)

Array.tabulate[SparkPartition](rawSplits.size) { i =>
new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
Array.tabulate[SparkPartition](rawSplits.size) { i =>
new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
}
}
}
}.values
}.values
}
}

private class MetadataCache {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.spark.sql.sources

import org.apache.spark.{SerializableWritable, Logging}
import org.apache.spark.{Logging, SerializableWritable, TaskContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{RDD, UnionRDD}
import org.apache.spark.rdd.{MapPartitionsRDD, RDD, UnionRDD}
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
Expand All @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.types.{StringType, StructType, UTF8String}
import org.apache.spark.sql.{SaveMode, Strategy, execution, sources}
import org.apache.spark.util.Utils

/**
* A Strategy for planning scans over data sources defined using the sources API.
Expand Down Expand Up @@ -197,7 +198,10 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
}
}

dataRows.mapPartitions { iterator =>
// Since we know for sure that this closure is serializable, we can avoid the overhead
// of cleaning a closure for each RDD by creating our own MapPartitionsRDD. Functionally
// this is equivalent to calling `dataRows.mapPartitions(mapPartitionsFunc)` (SPARK-7718).
val mapPartitionsFunc = (_: TaskContext, _: Int, iterator: Iterator[Row]) => {
val dataTypes = requiredColumns.map(schema(_).dataType)
val mutableRow = new SpecificMutableRow(dataTypes)
iterator.map { dataRow =>
Expand All @@ -209,6 +213,14 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
mutableRow.asInstanceOf[expressions.Row]
}
}

// This is an internal RDD whose call site the user should not be concerned with
// Since we create many of these (one per partition), the time spent on computing
// the call site may add up.
Utils.withDummyCallSite(dataRows.sparkContext) {
new MapPartitionsRDD(dataRows, mapPartitionsFunc, preservesPartitioning = false)
}

} else {
dataRows
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,6 @@ private[sql] class SqlNewHadoopRDD[K, V](
with SparkHadoopMapReduceUtil
with Logging {

if (initLocalJobFuncOpt.isDefined) {
sc.clean(initLocalJobFuncOpt.get)
}

protected def getJob(): Job = {
val conf: Configuration = broadcastedConf.value.value
// "new Job" will make a copy of the conf. Then, it is
Expand Down