From 2d58a2b2e96d9b90a74e73be24bcc0c479837265 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 19 May 2015 00:04:45 +0800 Subject: [PATCH] Skips reading row group information when using task side metadata reading --- .../apache/spark/sql/parquet/newParquet.scala | 27 ++++++++++++------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index a95ad751410d4..2f920f626d441 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -23,12 +23,11 @@ import scala.collection.JavaConversions._ import scala.util.Try import com.google.common.base.Objects -import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import parquet.filter2.predicate.FilterApi -import parquet.format.converter.ParquetMetadataConverter import parquet.hadoop._ import parquet.hadoop.metadata.CompressionCodecName import parquet.hadoop.util.ContextUtil @@ -269,7 +268,7 @@ private[sql] class ParquetRelation2( val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString) - val footers = inputFiles.map(metadataCache.footers) + val footers = inputFiles.map(f => metadataCache.footers(f.getPath)) // TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`. // After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and @@ -330,7 +329,7 @@ private[sql] class ParquetRelation2( private var commonMetadataStatuses: Array[FileStatus] = _ // Parquet footer cache. - var footers: Map[FileStatus, Footer] = _ + var footers: Map[Path, Footer] = _ // `FileStatus` objects of all data files (Parquet part-files). var dataStatuses: Array[FileStatus] = _ @@ -357,11 +356,19 @@ private[sql] class ParquetRelation2( commonMetadataStatuses = leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE) - footers = (dataStatuses ++ metadataStatuses ++ commonMetadataStatuses).par.map { f => - val parquetMetadata = ParquetFileReader.readFooter( - SparkHadoopUtil.get.conf, f, ParquetMetadataConverter.NO_FILTER) - f -> new Footer(f.getPath, parquetMetadata) - }.seq.toMap + footers = { + val conf = SparkHadoopUtil.get.conf + val taskSideMetaData = conf.getBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true) + val rawFooters = if (shouldMergeSchemas) { + ParquetFileReader.readAllFootersInParallel( + conf, seqAsJavaList(leaves), taskSideMetaData) + } else { + ParquetFileReader.readAllFootersInParallelUsingSummaryFiles( + conf, seqAsJavaList(leaves), taskSideMetaData) + } + + rawFooters.map(footer => footer.getFile -> footer).toMap + } dataSchema = { val dataSchema0 = @@ -428,7 +435,7 @@ private[sql] class ParquetRelation2( "No schema defined, " + s"and no Parquet data file or summary file found under ${paths.mkString(", ")}.") - ParquetRelation2.readSchema(filesToTouch.map(footers.apply), sqlContext) + ParquetRelation2.readSchema(filesToTouch.map(f => footers.apply(f.getPath)), sqlContext) } } }