Skip to content

Commit

Permalink
Reuses HadoopFsRelation FileStatusCache in ParquetRelation2
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed May 18, 2015
1 parent 3d278f7 commit ba41250
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ private[sql] class ParquetRelation2(
override def dataSchema: StructType = metadataCache.dataSchema

override private[sql] def refresh(): Unit = {
metadataCache.refresh()
super.refresh()
metadataCache.refresh()
}

// Parquet data source always uses Catalyst internal representations.
Expand Down Expand Up @@ -346,24 +346,11 @@ private[sql] class ParquetRelation2(
* Refreshes `FileStatus`es, footers, partition spec, and table schema.
*/
def refresh(): Unit = {
// Support either reading a collection of raw Parquet part-files, or a collection of folders
// containing Parquet files (e.g. partitioned Parquet table).
val baseStatuses = paths.distinct.flatMap { p =>
val path = new Path(p)
val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
val qualified = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
Try(fs.getFileStatus(qualified)).toOption
}
assert(baseStatuses.forall(!_.isDir) || baseStatuses.forall(_.isDir))

// Lists `FileStatus`es of all leaf nodes (files) under all base directories.
val leaves = baseStatuses.flatMap { f =>
val fs = FileSystem.get(f.getPath.toUri, SparkHadoopUtil.get.conf)
SparkHadoopUtil.get.listLeafStatuses(fs, f.getPath).filter { f =>
isSummaryFile(f.getPath) ||
!(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
}
}
val leaves = cachedLeafStatuses().filter { f =>
isSummaryFile(f.getPath) ||
!(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
}.toArray

dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
metadataStatuses = leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.sources

import scala.collection.mutable
import scala.util.Try

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
Expand Down Expand Up @@ -382,14 +383,13 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
leafDirs.clear()
leafFiles.clear()

// We don't filter files/directories like _temporary/_SUCCESS here, as specific data sources
// may take advantages over them (e.g. Parquet _metadata and _common_metadata files).
val statuses = paths.flatMap { path =>
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
listLeafFilesAndDirs(fs, fs.getFileStatus(qualified)).filterNot { status =>
val name = status.getPath.getName
!status.isDir && (name.startsWith("_") || name.startsWith("."))
}
Try(fs.getFileStatus(qualified)).toOption.toArray.flatMap(listLeafFilesAndDirs(fs, _))
}

val (dirs, files) = statuses.partition(_.isDir)
Expand All @@ -404,6 +404,10 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
cache
}

protected def cachedLeafStatuses(): Set[FileStatus] = {
fileStatusCache.leafFiles.values.toSet
}

final private[sql] def partitionSpec: PartitionSpec = {
if (_partitionSpec == null) {
_partitionSpec = maybePartitionSpec
Expand Down Expand Up @@ -481,7 +485,9 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
val inputStatuses = inputPaths.flatMap { input =>
fileStatusCache.leafFiles.values.filter { status =>
val path = new Path(input)
status.getPath.getParent == path || status.getPath == path
(status.getPath.getParent == path || status.getPath == path) &&
!status.getPath.getName.startsWith("_") &&
!status.getPath.getName.startsWith(".")
}
}
buildScan(requiredColumns, filters, inputStatuses)
Expand Down

0 comments on commit ba41250

Please sign in to comment.