Skip to content

Commit

Permalink
[SPARK-7673] [SQL] WIP: HadoopFsRelation and ParquetRelation2 perform…
Browse files Browse the repository at this point in the history
…ance optimizations

This PR introduces several performance optimizations to `HadoopFsRelation` and `ParquetRelation2`:

1.  Moving `FileStatus` listing from `DataSourceStrategy` into a cache within `HadoopFsRelation`.

    This new cache generalizes and replaces the one used in `ParquetRelation2`.

    This also introduces an interface change: to reuse cached `FileStatus` objects, `HadoopFsRelation.buildScan` methods now receive `Array[FileStatus]` instead of `Array[String]`.

1.  When Parquet task side metadata reading is enabled, skip reading row group information when reading Parquet footers.

    This is basically what PR #5334 does. Also, now we uses `ParquetFileReader.readAllFootersInParallel` to read footers in parallel.

Another optimization in question is, instead of asking `HadoopFsRelation.buildScan` to return an `RDD[Row]` for a single selected partition and then union them all, we ask it to return an `RDD[Row]` for all selected partitions. This optimization is based on the fact that Hadoop configuration broadcasting used in `NewHadoopRDD` takes 34% time in the following microbenchmark.  However, this complicates data source user code because user code must merge partition values manually.

To check the cost of broadcasting in `NewHadoopRDD`, I also did microbenchmark after removing the `broadcast` call in `NewHadoopRDD`.  All results are shown below.

### Microbenchmark

#### Preparation code

Generating a partitioned table with 50k partitions, 1k rows per partition:

```scala
import sqlContext._
import sqlContext.implicits._

for (n <- 0 until 500) {
  val data = for {
    p <- (n * 10) until ((n + 1) * 10)
    i <- 0 until 1000
  } yield (i, f"val_$i%04d", f"$p%04d")

  data.
    toDF("a", "b", "p").
    write.
    partitionBy("p").
    mode("append").
    parquet(path)
}
```

#### Benchmarking code

```scala
import sqlContext._
import sqlContext.implicits._

import org.apache.spark.sql.types._
import com.google.common.base.Stopwatch

val path = "hdfs://localhost:9000/user/lian/5k"

def benchmark(n: Int)(f: => Unit) {
  val stopwatch = new Stopwatch()

  def run() = {
    stopwatch.reset()
    stopwatch.start()
    f
    stopwatch.stop()
    stopwatch.elapsedMillis()
  }

  val records = (0 until n).map(_ => run())

  (0 until n).foreach(i => println(s"Round $i: ${records(i)} ms"))
  println(s"Average: ${records.sum / n.toDouble} ms")
}

benchmark(3) { read.parquet(path).explain(extended = true) }
```

#### Results

Before:

```
Round 0: 72528 ms
Round 1: 68938 ms
Round 2: 65372 ms
Average: 68946.0 ms
```

After:

```
Round 0: 59499 ms
Round 1: 53645 ms
Round 2: 53844 ms
Round 3: 49093 ms
Round 4: 50555 ms
Average: 53327.2 ms
```

Also removing Hadoop configuration broadcasting:

(Note that I was testing on a local laptop, thus network cost is pretty low.)

```
Round 0: 15806 ms
Round 1: 14394 ms
Round 2: 14699 ms
Round 3: 15334 ms
Round 4: 14123 ms
Average: 14871.2 ms
```

Author: Cheng Lian <[email protected]>

Closes #6225 from liancheng/spark-7673 and squashes the following commits:

2d58a2b [Cheng Lian] Skips reading row group information when using task side metadata reading
7aa3748 [Cheng Lian] Optimizes FileStatusCache by introducing a map from parent directories to child files
ba41250 [Cheng Lian] Reuses HadoopFsRelation FileStatusCache in ParquetRelation2
3d278f7 [Cheng Lian] Fixes a bug when reading a single Parquet data file
b84612a [Cheng Lian] Fixes Scala style issue
6a08b02 [Cheng Lian] WIP: Moves file status cache into HadoopFSRelation
  • Loading branch information
liancheng authored and yhuai committed May 18, 2015
1 parent 530397b commit 9dadf01
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ import scala.collection.JavaConversions._
import scala.util.Try

import com.google.common.base.Objects
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import parquet.filter2.predicate.FilterApi
import parquet.format.converter.ParquetMetadataConverter
import parquet.hadoop._
import parquet.hadoop.metadata.CompressionCodecName
import parquet.hadoop.util.ContextUtil
Expand Down Expand Up @@ -175,8 +174,8 @@ private[sql] class ParquetRelation2(
override def dataSchema: StructType = metadataCache.dataSchema

override private[sql] def refresh(): Unit = {
metadataCache.refresh()
super.refresh()
metadataCache.refresh()
}

// Parquet data source always uses Catalyst internal representations.
Expand Down Expand Up @@ -234,15 +233,15 @@ private[sql] class ParquetRelation2(
override def buildScan(
requiredColumns: Array[String],
filters: Array[Filter],
inputPaths: Array[String]): RDD[Row] = {
inputFiles: Array[FileStatus]): RDD[Row] = {

val job = new Job(SparkHadoopUtil.get.conf)
val conf = ContextUtil.getConfiguration(job)

ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])

if (inputPaths.nonEmpty) {
FileInputFormat.setInputPaths(job, inputPaths.map(new Path(_)): _*)
if (inputFiles.nonEmpty) {
FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
}

// Try to push down filters when filter push-down is enabled.
Expand All @@ -269,10 +268,7 @@ private[sql] class ParquetRelation2(
val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean
conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString)

val inputFileStatuses =
metadataCache.dataStatuses.filter(f => inputPaths.contains(f.getPath.toString))

val footers = inputFileStatuses.map(metadataCache.footers)
val footers = inputFiles.map(f => metadataCache.footers(f.getPath))

// TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
// After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and
Expand All @@ -287,7 +283,7 @@ private[sql] class ParquetRelation2(

val cacheMetadata = useMetadataCache

@transient val cachedStatuses = inputFileStatuses.map { f =>
@transient val cachedStatuses = inputFiles.map { f =>
// In order to encode the authority of a Path containing special characters such as /,
// we need to use the string returned by the URI of the path to create a new Path.
val pathWithAuthority = new Path(f.getPath.toUri.toString)
Expand Down Expand Up @@ -333,7 +329,7 @@ private[sql] class ParquetRelation2(
private var commonMetadataStatuses: Array[FileStatus] = _

// Parquet footer cache.
var footers: Map[FileStatus, Footer] = _
var footers: Map[Path, Footer] = _

// `FileStatus` objects of all data files (Parquet part-files).
var dataStatuses: Array[FileStatus] = _
Expand All @@ -349,35 +345,30 @@ private[sql] class ParquetRelation2(
* Refreshes `FileStatus`es, footers, partition spec, and table schema.
*/
def refresh(): Unit = {
// Support either reading a collection of raw Parquet part-files, or a collection of folders
// containing Parquet files (e.g. partitioned Parquet table).
val baseStatuses = paths.distinct.flatMap { p =>
val path = new Path(p)
val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
val qualified = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
Try(fs.getFileStatus(qualified)).toOption
}
assert(baseStatuses.forall(!_.isDir) || baseStatuses.forall(_.isDir))

// Lists `FileStatus`es of all leaf nodes (files) under all base directories.
val leaves = baseStatuses.flatMap { f =>
val fs = FileSystem.get(f.getPath.toUri, SparkHadoopUtil.get.conf)
SparkHadoopUtil.get.listLeafStatuses(fs, f.getPath).filter { f =>
isSummaryFile(f.getPath) ||
!(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
}
}
val leaves = cachedLeafStatuses().filter { f =>
isSummaryFile(f.getPath) ||
!(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
}.toArray

dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
metadataStatuses = leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)
commonMetadataStatuses =
leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)

footers = (dataStatuses ++ metadataStatuses ++ commonMetadataStatuses).par.map { f =>
val parquetMetadata = ParquetFileReader.readFooter(
SparkHadoopUtil.get.conf, f, ParquetMetadataConverter.NO_FILTER)
f -> new Footer(f.getPath, parquetMetadata)
}.seq.toMap
footers = {
val conf = SparkHadoopUtil.get.conf
val taskSideMetaData = conf.getBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true)
val rawFooters = if (shouldMergeSchemas) {
ParquetFileReader.readAllFootersInParallel(
conf, seqAsJavaList(leaves), taskSideMetaData)
} else {
ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(
conf, seqAsJavaList(leaves), taskSideMetaData)
}

rawFooters.map(footer => footer.getFile -> footer).toMap
}

// If we already get the schema, don't need to re-compute it since the schema merging is
// time-consuming.
Expand Down Expand Up @@ -448,7 +439,7 @@ private[sql] class ParquetRelation2(
"No schema defined, " +
s"and no Parquet data file or summary file found under ${paths.mkString(", ")}.")

ParquetRelation2.readSchema(filesToTouch.map(footers.apply), sqlContext)
ParquetRelation2.readSchema(filesToTouch.map(f => footers.apply(f.getPath)), sqlContext)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,16 @@

package org.apache.spark.sql.sources

import org.apache.hadoop.fs.Path

import org.apache.spark.Logging
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{UnionRDD, RDD}
import org.apache.spark.sql.Row
import org.apache.spark.rdd.{RDD, UnionRDD}
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.types.{StructType, UTF8String, StringType}
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StringType, StructType, UTF8String}
import org.apache.spark.sql.{SaveMode, Strategy, execution, sources}

/**
* A Strategy for planning scans over data sources defined using the sources API.
Expand Down Expand Up @@ -58,7 +54,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
filters,
(a, _) => t.buildScan(a)) :: Nil

// Scanning partitioned FSBasedRelation
// Scanning partitioned HadoopFsRelation
case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation))
if t.partitionSpec.partitionColumns.nonEmpty =>
val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray
Expand Down Expand Up @@ -86,22 +82,13 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
t.partitionSpec.partitionColumns,
selectedPartitions) :: Nil

// Scanning non-partitioned FSBasedRelation
// Scanning non-partitioned HadoopFsRelation
case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) =>
val inputPaths = t.paths.map(new Path(_)).flatMap { path =>
val fs = path.getFileSystem(t.sqlContext.sparkContext.hadoopConfiguration)
val qualifiedPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
SparkHadoopUtil.get.listLeafStatuses(fs, qualifiedPath).map(_.getPath).filterNot { path =>
val name = path.getName
name.startsWith("_") || name.startsWith(".")
}.map(fs.makeQualified(_).toString)
}

pruneFilterProject(
l,
projectList,
filters,
(a, f) => t.buildScan(a, f, inputPaths)) :: Nil
(a, f) => t.buildScan(a, f, t.paths)) :: Nil

case l @ LogicalRelation(t: TableScan) =>
createPhysicalRDD(l.relation, l.output, t.buildScan()) :: Nil
Expand Down Expand Up @@ -130,16 +117,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {

// Builds RDD[Row]s for each selected partition.
val perPartitionRows = partitions.map { case Partition(partitionValues, dir) =>
// Paths to all data files within this partition
val dataFilePaths = {
val dirPath = new Path(dir)
val fs = dirPath.getFileSystem(SparkHadoopUtil.get.conf)
fs.listStatus(dirPath).map(_.getPath).filterNot { path =>
val name = path.getName
name.startsWith("_") || name.startsWith(".")
}.map(fs.makeQualified(_).toString)
}

// The table scan operator (PhysicalRDD) which retrieves required columns from data files.
// Notice that the schema of data files, represented by `relation.dataSchema`, may contain
// some partition column(s).
Expand All @@ -155,7 +132,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// assuming partition columns data stored in data files are always consistent with those
// partition values encoded in partition directory paths.
val nonPartitionColumns = requiredColumns.filterNot(partitionColNames.contains)
val dataRows = relation.buildScan(nonPartitionColumns, filters, dataFilePaths)
val dataRows = relation.buildScan(nonPartitionColumns, filters, Array(dir))

// Merges data values with partition values.
mergeWithPartitionValues(
Expand Down
Loading

0 comments on commit 9dadf01

Please sign in to comment.