Skip to content

Commit

Permalink
Skips reading row group information when using task side metadata rea…
Browse files Browse the repository at this point in the history
…ding
  • Loading branch information
liancheng committed May 18, 2015
1 parent 7aa3748 commit 2d58a2b
Showing 1 changed file with 17 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ import scala.collection.JavaConversions._
import scala.util.Try

import com.google.common.base.Objects
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import parquet.filter2.predicate.FilterApi
import parquet.format.converter.ParquetMetadataConverter
import parquet.hadoop._
import parquet.hadoop.metadata.CompressionCodecName
import parquet.hadoop.util.ContextUtil
Expand Down Expand Up @@ -269,7 +268,7 @@ private[sql] class ParquetRelation2(
val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean
conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString)

val footers = inputFiles.map(metadataCache.footers)
val footers = inputFiles.map(f => metadataCache.footers(f.getPath))

// TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
// After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and
Expand Down Expand Up @@ -330,7 +329,7 @@ private[sql] class ParquetRelation2(
private var commonMetadataStatuses: Array[FileStatus] = _

// Parquet footer cache.
var footers: Map[FileStatus, Footer] = _
var footers: Map[Path, Footer] = _

// `FileStatus` objects of all data files (Parquet part-files).
var dataStatuses: Array[FileStatus] = _
Expand All @@ -357,11 +356,19 @@ private[sql] class ParquetRelation2(
commonMetadataStatuses =
leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)

footers = (dataStatuses ++ metadataStatuses ++ commonMetadataStatuses).par.map { f =>
val parquetMetadata = ParquetFileReader.readFooter(
SparkHadoopUtil.get.conf, f, ParquetMetadataConverter.NO_FILTER)
f -> new Footer(f.getPath, parquetMetadata)
}.seq.toMap
footers = {
val conf = SparkHadoopUtil.get.conf
val taskSideMetaData = conf.getBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true)
val rawFooters = if (shouldMergeSchemas) {
ParquetFileReader.readAllFootersInParallel(
conf, seqAsJavaList(leaves), taskSideMetaData)
} else {
ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(
conf, seqAsJavaList(leaves), taskSideMetaData)
}

rawFooters.map(footer => footer.getFile -> footer).toMap
}

dataSchema = {
val dataSchema0 =
Expand Down Expand Up @@ -428,7 +435,7 @@ private[sql] class ParquetRelation2(
"No schema defined, " +
s"and no Parquet data file or summary file found under ${paths.mkString(", ")}.")

ParquetRelation2.readSchema(filesToTouch.map(footers.apply), sqlContext)
ParquetRelation2.readSchema(filesToTouch.map(f => footers.apply(f.getPath)), sqlContext)
}
}
}
Expand Down

0 comments on commit 2d58a2b

Please sign in to comment.