diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index f684ddaf7fc92..a95ad751410d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -175,8 +175,8 @@ private[sql] class ParquetRelation2( override def dataSchema: StructType = metadataCache.dataSchema override private[sql] def refresh(): Unit = { - metadataCache.refresh() super.refresh() + metadataCache.refresh() } // Parquet data source always uses Catalyst internal representations. @@ -346,24 +346,11 @@ private[sql] class ParquetRelation2( * Refreshes `FileStatus`es, footers, partition spec, and table schema. */ def refresh(): Unit = { - // Support either reading a collection of raw Parquet part-files, or a collection of folders - // containing Parquet files (e.g. partitioned Parquet table). - val baseStatuses = paths.distinct.flatMap { p => - val path = new Path(p) - val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) - val qualified = path.makeQualified(fs.getUri, fs.getWorkingDirectory) - Try(fs.getFileStatus(qualified)).toOption - } - assert(baseStatuses.forall(!_.isDir) || baseStatuses.forall(_.isDir)) - // Lists `FileStatus`es of all leaf nodes (files) under all base directories. - val leaves = baseStatuses.flatMap { f => - val fs = FileSystem.get(f.getPath.toUri, SparkHadoopUtil.get.conf) - SparkHadoopUtil.get.listLeafStatuses(fs, f.getPath).filter { f => - isSummaryFile(f.getPath) || - !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith(".")) - } - } + val leaves = cachedLeafStatuses().filter { f => + isSummaryFile(f.getPath) || + !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith(".")) + }.toArray dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath)) metadataStatuses = leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 894579abf1991..655d2da7ac3c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.sources import scala.collection.mutable +import scala.util.Try import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} @@ -382,14 +383,13 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio leafDirs.clear() leafFiles.clear() + // We don't filter files/directories like _temporary/_SUCCESS here, as specific data sources + // may take advantages over them (e.g. Parquet _metadata and _common_metadata files). val statuses = paths.flatMap { path => val hdfsPath = new Path(path) val fs = hdfsPath.getFileSystem(hadoopConf) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - listLeafFilesAndDirs(fs, fs.getFileStatus(qualified)).filterNot { status => - val name = status.getPath.getName - !status.isDir && (name.startsWith("_") || name.startsWith(".")) - } + Try(fs.getFileStatus(qualified)).toOption.toArray.flatMap(listLeafFilesAndDirs(fs, _)) } val (dirs, files) = statuses.partition(_.isDir) @@ -404,6 +404,10 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio cache } + protected def cachedLeafStatuses(): Set[FileStatus] = { + fileStatusCache.leafFiles.values.toSet + } + final private[sql] def partitionSpec: PartitionSpec = { if (_partitionSpec == null) { _partitionSpec = maybePartitionSpec @@ -481,7 +485,9 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio val inputStatuses = inputPaths.flatMap { input => fileStatusCache.leafFiles.values.filter { status => val path = new Path(input) - status.getPath.getParent == path || status.getPath == path + (status.getPath.getParent == path || status.getPath == path) && + !status.getPath.getName.startsWith("_") && + !status.getPath.getName.startsWith(".") } } buildScan(requiredColumns, filters, inputStatuses)