From 50948ee5aabc72dff487ce457f2b3e7c5c6ca4e0 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 18 May 2015 12:45:37 -0700 Subject: [PATCH] [SPARK-7673] [SQL] WIP: HadoopFsRelation and ParquetRelation2 performance optimizations This PR introduces several performance optimizations to `HadoopFsRelation` and `ParquetRelation2`: 1. Moving `FileStatus` listing from `DataSourceStrategy` into a cache within `HadoopFsRelation`. This new cache generalizes and replaces the one used in `ParquetRelation2`. This also introduces an interface change: to reuse cached `FileStatus` objects, `HadoopFsRelation.buildScan` methods now receive `Array[FileStatus]` instead of `Array[String]`. 1. When Parquet task side metadata reading is enabled, skip reading row group information when reading Parquet footers. This is basically what PR #5334 does. Also, now we uses `ParquetFileReader.readAllFootersInParallel` to read footers in parallel. Another optimization in question is, instead of asking `HadoopFsRelation.buildScan` to return an `RDD[Row]` for a single selected partition and then union them all, we ask it to return an `RDD[Row]` for all selected partitions. This optimization is based on the fact that Hadoop configuration broadcasting used in `NewHadoopRDD` takes 34% time in the following microbenchmark. However, this complicates data source user code because user code must merge partition values manually. To check the cost of broadcasting in `NewHadoopRDD`, I also did microbenchmark after removing the `broadcast` call in `NewHadoopRDD`. All results are shown below. ### Microbenchmark #### Preparation code Generating a partitioned table with 50k partitions, 1k rows per partition: ```scala import sqlContext._ import sqlContext.implicits._ for (n <- 0 until 500) { val data = for { p <- (n * 10) until ((n + 1) * 10) i <- 0 until 1000 } yield (i, f"val_$i%04d", f"$p%04d") data. toDF("a", "b", "p"). write. partitionBy("p"). mode("append"). parquet(path) } ``` #### Benchmarking code ```scala import sqlContext._ import sqlContext.implicits._ import org.apache.spark.sql.types._ import com.google.common.base.Stopwatch val path = "hdfs://localhost:9000/user/lian/5k" def benchmark(n: Int)(f: => Unit) { val stopwatch = new Stopwatch() def run() = { stopwatch.reset() stopwatch.start() f stopwatch.stop() stopwatch.elapsedMillis() } val records = (0 until n).map(_ => run()) (0 until n).foreach(i => println(s"Round $i: ${records(i)} ms")) println(s"Average: ${records.sum / n.toDouble} ms") } benchmark(3) { read.parquet(path).explain(extended = true) } ``` #### Results Before: ``` Round 0: 72528 ms Round 1: 68938 ms Round 2: 65372 ms Average: 68946.0 ms ``` After: ``` Round 0: 59499 ms Round 1: 53645 ms Round 2: 53844 ms Round 3: 49093 ms Round 4: 50555 ms Average: 53327.2 ms ``` Also removing Hadoop configuration broadcasting: (Note that I was testing on a local laptop, thus network cost is pretty low.) ``` Round 0: 15806 ms Round 1: 14394 ms Round 2: 14699 ms Round 3: 15334 ms Round 4: 14123 ms Average: 14871.2 ms ``` Author: Cheng Lian Closes #6225 from liancheng/spark-7673 and squashes the following commits: 2d58a2b [Cheng Lian] Skips reading row group information when using task side metadata reading 7aa3748 [Cheng Lian] Optimizes FileStatusCache by introducing a map from parent directories to child files ba41250 [Cheng Lian] Reuses HadoopFsRelation FileStatusCache in ParquetRelation2 3d278f7 [Cheng Lian] Fixes a bug when reading a single Parquet data file b84612a [Cheng Lian] Fixes Scala style issue 6a08b02 [Cheng Lian] WIP: Moves file status cache into HadoopFSRelation --- .../apache/spark/sql/parquet/newParquet.scala | 61 +++++----- .../sql/sources/DataSourceStrategy.scala | 37 ++----- .../apache/spark/sql/sources/interfaces.scala | 104 ++++++++++++++---- .../sql/sources/SimpleTextRelation.scala | 6 +- 4 files changed, 117 insertions(+), 91 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index fea54a251461d..7ca44f7b81a2d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -23,12 +23,11 @@ import scala.collection.JavaConversions._ import scala.util.Try import com.google.common.base.Objects -import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import parquet.filter2.predicate.FilterApi -import parquet.format.converter.ParquetMetadataConverter import parquet.hadoop._ import parquet.hadoop.metadata.CompressionCodecName import parquet.hadoop.util.ContextUtil @@ -175,8 +174,8 @@ private[sql] class ParquetRelation2( override def dataSchema: StructType = metadataCache.dataSchema override private[sql] def refresh(): Unit = { - metadataCache.refresh() super.refresh() + metadataCache.refresh() } // Parquet data source always uses Catalyst internal representations. @@ -234,15 +233,15 @@ private[sql] class ParquetRelation2( override def buildScan( requiredColumns: Array[String], filters: Array[Filter], - inputPaths: Array[String]): RDD[Row] = { + inputFiles: Array[FileStatus]): RDD[Row] = { val job = new Job(SparkHadoopUtil.get.conf) val conf = ContextUtil.getConfiguration(job) ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport]) - if (inputPaths.nonEmpty) { - FileInputFormat.setInputPaths(job, inputPaths.map(new Path(_)): _*) + if (inputFiles.nonEmpty) { + FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*) } // Try to push down filters when filter push-down is enabled. @@ -269,10 +268,7 @@ private[sql] class ParquetRelation2( val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString) - val inputFileStatuses = - metadataCache.dataStatuses.filter(f => inputPaths.contains(f.getPath.toString)) - - val footers = inputFileStatuses.map(metadataCache.footers) + val footers = inputFiles.map(f => metadataCache.footers(f.getPath)) // TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`. // After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and @@ -287,7 +283,7 @@ private[sql] class ParquetRelation2( val cacheMetadata = useMetadataCache - @transient val cachedStatuses = inputFileStatuses.map { f => + @transient val cachedStatuses = inputFiles.map { f => // In order to encode the authority of a Path containing special characters such as /, // we need to use the string returned by the URI of the path to create a new Path. val pathWithAuthority = new Path(f.getPath.toUri.toString) @@ -333,7 +329,7 @@ private[sql] class ParquetRelation2( private var commonMetadataStatuses: Array[FileStatus] = _ // Parquet footer cache. - var footers: Map[FileStatus, Footer] = _ + var footers: Map[Path, Footer] = _ // `FileStatus` objects of all data files (Parquet part-files). var dataStatuses: Array[FileStatus] = _ @@ -349,35 +345,30 @@ private[sql] class ParquetRelation2( * Refreshes `FileStatus`es, footers, partition spec, and table schema. */ def refresh(): Unit = { - // Support either reading a collection of raw Parquet part-files, or a collection of folders - // containing Parquet files (e.g. partitioned Parquet table). - val baseStatuses = paths.distinct.flatMap { p => - val path = new Path(p) - val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) - val qualified = path.makeQualified(fs.getUri, fs.getWorkingDirectory) - Try(fs.getFileStatus(qualified)).toOption - } - assert(baseStatuses.forall(!_.isDir) || baseStatuses.forall(_.isDir)) - // Lists `FileStatus`es of all leaf nodes (files) under all base directories. - val leaves = baseStatuses.flatMap { f => - val fs = FileSystem.get(f.getPath.toUri, SparkHadoopUtil.get.conf) - SparkHadoopUtil.get.listLeafStatuses(fs, f.getPath).filter { f => - isSummaryFile(f.getPath) || - !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith(".")) - } - } + val leaves = cachedLeafStatuses().filter { f => + isSummaryFile(f.getPath) || + !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith(".")) + }.toArray dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath)) metadataStatuses = leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE) commonMetadataStatuses = leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE) - footers = (dataStatuses ++ metadataStatuses ++ commonMetadataStatuses).par.map { f => - val parquetMetadata = ParquetFileReader.readFooter( - SparkHadoopUtil.get.conf, f, ParquetMetadataConverter.NO_FILTER) - f -> new Footer(f.getPath, parquetMetadata) - }.seq.toMap + footers = { + val conf = SparkHadoopUtil.get.conf + val taskSideMetaData = conf.getBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true) + val rawFooters = if (shouldMergeSchemas) { + ParquetFileReader.readAllFootersInParallel( + conf, seqAsJavaList(leaves), taskSideMetaData) + } else { + ParquetFileReader.readAllFootersInParallelUsingSummaryFiles( + conf, seqAsJavaList(leaves), taskSideMetaData) + } + + rawFooters.map(footer => footer.getFile -> footer).toMap + } // If we already get the schema, don't need to re-compute it since the schema merging is // time-consuming. @@ -448,7 +439,7 @@ private[sql] class ParquetRelation2( "No schema defined, " + s"and no Parquet data file or summary file found under ${paths.mkString(", ")}.") - ParquetRelation2.readSchema(filesToTouch.map(footers.apply), sqlContext) + ParquetRelation2.readSchema(filesToTouch.map(f => footers.apply(f.getPath)), sqlContext) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala index e6324b20b3065..1615a6dcbdb2a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala @@ -17,20 +17,16 @@ package org.apache.spark.sql.sources -import org.apache.hadoop.fs.Path - import org.apache.spark.Logging -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.rdd.{UnionRDD, RDD} -import org.apache.spark.sql.Row +import org.apache.spark.rdd.{RDD, UnionRDD} import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.types.{StructType, UTF8String, StringType} -import org.apache.spark.sql._ +import org.apache.spark.sql.types.{StringType, StructType, UTF8String} +import org.apache.spark.sql.{SaveMode, Strategy, execution, sources} /** * A Strategy for planning scans over data sources defined using the sources API. @@ -58,7 +54,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { filters, (a, _) => t.buildScan(a)) :: Nil - // Scanning partitioned FSBasedRelation + // Scanning partitioned HadoopFsRelation case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) if t.partitionSpec.partitionColumns.nonEmpty => val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray @@ -86,22 +82,13 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { t.partitionSpec.partitionColumns, selectedPartitions) :: Nil - // Scanning non-partitioned FSBasedRelation + // Scanning non-partitioned HadoopFsRelation case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) => - val inputPaths = t.paths.map(new Path(_)).flatMap { path => - val fs = path.getFileSystem(t.sqlContext.sparkContext.hadoopConfiguration) - val qualifiedPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory) - SparkHadoopUtil.get.listLeafStatuses(fs, qualifiedPath).map(_.getPath).filterNot { path => - val name = path.getName - name.startsWith("_") || name.startsWith(".") - }.map(fs.makeQualified(_).toString) - } - pruneFilterProject( l, projectList, filters, - (a, f) => t.buildScan(a, f, inputPaths)) :: Nil + (a, f) => t.buildScan(a, f, t.paths)) :: Nil case l @ LogicalRelation(t: TableScan) => createPhysicalRDD(l.relation, l.output, t.buildScan()) :: Nil @@ -130,16 +117,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { // Builds RDD[Row]s for each selected partition. val perPartitionRows = partitions.map { case Partition(partitionValues, dir) => - // Paths to all data files within this partition - val dataFilePaths = { - val dirPath = new Path(dir) - val fs = dirPath.getFileSystem(SparkHadoopUtil.get.conf) - fs.listStatus(dirPath).map(_.getPath).filterNot { path => - val name = path.getName - name.startsWith("_") || name.startsWith(".") - }.map(fs.makeQualified(_).toString) - } - // The table scan operator (PhysicalRDD) which retrieves required columns from data files. // Notice that the schema of data files, represented by `relation.dataSchema`, may contain // some partition column(s). @@ -155,7 +132,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { // assuming partition columns data stored in data files are always consistent with those // partition values encoded in partition directory paths. val nonPartitionColumns = requiredColumns.filterNot(partitionColNames.contains) - val dataRows = relation.buildScan(nonPartitionColumns, filters, dataFilePaths) + val dataRows = relation.buildScan(nonPartitionColumns, filters, Array(dir)) // Merges data values with partition values. mergeWithPartitionValues( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index a82a6758d2537..9b52d1be3df2d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -17,14 +17,14 @@ package org.apache.spark.sql.sources +import scala.collection.mutable import scala.util.Try import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.spark.annotation.{DeveloperApi, Experimental} -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions._ @@ -368,18 +368,61 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio private var _partitionSpec: PartitionSpec = _ + private class FileStatusCache { + var leafFiles = mutable.Map.empty[Path, FileStatus] + + var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]] + + var leafDirs = mutable.Map.empty[Path, FileStatus] + + def refresh(): Unit = { + def listLeafFilesAndDirs(fs: FileSystem, status: FileStatus): Set[FileStatus] = { + val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir) + val leafDirs = if (dirs.isEmpty) Set(status) else Set.empty[FileStatus] + files.toSet ++ leafDirs ++ dirs.flatMap(dir => listLeafFilesAndDirs(fs, dir)) + } + + leafDirs.clear() + leafFiles.clear() + + // We don't filter files/directories like _temporary/_SUCCESS here, as specific data sources + // may take advantages over them (e.g. Parquet _metadata and _common_metadata files). + val statuses = paths.flatMap { path => + val hdfsPath = new Path(path) + val fs = hdfsPath.getFileSystem(hadoopConf) + val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + Try(fs.getFileStatus(qualified)).toOption.toArray.flatMap(listLeafFilesAndDirs(fs, _)) + } + + val (dirs, files) = statuses.partition(_.isDir) + leafDirs ++= dirs.map(d => d.getPath -> d).toMap + leafFiles ++= files.map(f => f.getPath -> f).toMap + leafDirToChildrenFiles ++= files.groupBy(_.getPath.getParent) + } + } + + private lazy val fileStatusCache = { + val cache = new FileStatusCache + cache.refresh() + cache + } + + protected def cachedLeafStatuses(): Set[FileStatus] = { + fileStatusCache.leafFiles.values.toSet + } + final private[sql] def partitionSpec: PartitionSpec = { if (_partitionSpec == null) { _partitionSpec = maybePartitionSpec .map(spec => spec.copy(partitionColumns = spec.partitionColumns.asNullable)) .orElse(userDefinedPartitionColumns.map(PartitionSpec(_, Array.empty[Partition]))) .getOrElse { - if (sqlContext.conf.partitionDiscoveryEnabled()) { - discoverPartitions() - } else { - PartitionSpec(StructType(Nil), Array.empty[Partition]) + if (sqlContext.conf.partitionDiscoveryEnabled()) { + discoverPartitions() + } else { + PartitionSpec(StructType(Nil), Array.empty[Partition]) + } } - } } _partitionSpec } @@ -409,20 +452,14 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio def userDefinedPartitionColumns: Option[StructType] = None private[sql] def refresh(): Unit = { + fileStatusCache.refresh() if (sqlContext.conf.partitionDiscoveryEnabled()) { _partitionSpec = discoverPartitions() } } private def discoverPartitions(): PartitionSpec = { - val basePaths = paths.map(new Path(_)) - val leafDirs = basePaths.flatMap { path => - val fs = path.getFileSystem(hadoopConf) - Try(fs.getFileStatus(path.makeQualified(fs.getUri, fs.getWorkingDirectory))) - .filter(_.isDir) - .map(SparkHadoopUtil.get.listLeafDirStatuses(fs, _)) - .getOrElse(Seq.empty[FileStatus]) - }.map(_.getPath) + val leafDirs = fileStatusCache.leafDirs.keys.toSeq if (leafDirs.nonEmpty) { PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME) @@ -444,6 +481,27 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio }) } + private[sources] final def buildScan( + requiredColumns: Array[String], + filters: Array[Filter], + inputPaths: Array[String]): RDD[Row] = { + val inputStatuses = inputPaths.flatMap { input => + val path = new Path(input) + + // First assumes `input` is a directory path, and tries to get all files contained in it. + fileStatusCache.leafDirToChildrenFiles.getOrElse( + path, + // Otherwise, `input` might be a file path + fileStatusCache.leafFiles.get(path).toArray + ).filter { status => + val name = status.getPath.getName + !name.startsWith("_") && !name.startsWith(".") + } + } + + buildScan(requiredColumns, filters, inputStatuses) + } + /** * Specifies schema of actual data files. For partitioned relations, if one or more partitioned * columns are contained in the data files, they should also appear in `dataSchema`. @@ -457,13 +515,13 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio * this relation. For partitioned relations, this method is called for each selected partition, * and builds an `RDD[Row]` containing all rows within that single partition. * - * @param inputPaths For a non-partitioned relation, it contains paths of all data files in the + * @param inputFiles For a non-partitioned relation, it contains paths of all data files in the * relation. For a partitioned relation, it contains paths of all data files in a single * selected partition. * * @since 1.4.0 */ - def buildScan(inputPaths: Array[String]): RDD[Row] = { + def buildScan(inputFiles: Array[FileStatus]): RDD[Row] = { throw new UnsupportedOperationException( "At least one buildScan() method should be overridden to read the relation.") } @@ -474,13 +532,13 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio * and builds an `RDD[Row]` containing all rows within that single partition. * * @param requiredColumns Required columns. - * @param inputPaths For a non-partitioned relation, it contains paths of all data files in the + * @param inputFiles For a non-partitioned relation, it contains paths of all data files in the * relation. For a partitioned relation, it contains paths of all data files in a single * selected partition. * * @since 1.4.0 */ - def buildScan(requiredColumns: Array[String], inputPaths: Array[String]): RDD[Row] = { + def buildScan(requiredColumns: Array[String], inputFiles: Array[FileStatus]): RDD[Row] = { // Yeah, to workaround serialization... val dataSchema = this.dataSchema val codegenEnabled = this.codegenEnabled @@ -490,7 +548,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio BoundReference(dataSchema.fieldIndex(col), field.dataType, field.nullable) }.toSeq - buildScan(inputPaths).mapPartitions { rows => + buildScan(inputFiles).mapPartitions { rows => val buildProjection = if (codegenEnabled) { GenerateMutableProjection.generate(requiredOutput, dataSchema.toAttributes) } else { @@ -512,7 +570,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio * of all `filters`. The pushed down filters are currently purely an optimization as they * will all be evaluated again. This means it is safe to use them with methods that produce * false positives such as filtering partitions based on a bloom filter. - * @param inputPaths For a non-partitioned relation, it contains paths of all data files in the + * @param inputFiles For a non-partitioned relation, it contains paths of all data files in the * relation. For a partitioned relation, it contains paths of all data files in a single * selected partition. * @@ -521,8 +579,8 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio def buildScan( requiredColumns: Array[String], filters: Array[Filter], - inputPaths: Array[String]): RDD[Row] = { - buildScan(requiredColumns, inputPaths) + inputFiles: Array[FileStatus]): RDD[Row] = { + buildScan(requiredColumns, inputFiles) } /** diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala index 29b21586f9c2a..09eed6646c55a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -21,7 +21,7 @@ import java.text.NumberFormat import java.util.UUID import com.google.common.base.Objects -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.io.{NullWritable, Text} import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat} import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext} @@ -101,10 +101,10 @@ class SimpleTextRelation( override def hashCode(): Int = Objects.hashCode(paths, maybeDataSchema, dataSchema) - override def buildScan(inputPaths: Array[String]): RDD[Row] = { + override def buildScan(inputStatuses: Array[FileStatus]): RDD[Row] = { val fields = dataSchema.map(_.dataType) - sparkContext.textFile(inputPaths.mkString(",")).map { record => + sparkContext.textFile(inputStatuses.map(_.getPath).mkString(",")).map { record => Row(record.split(",").zip(fields).map { case (value, dataType) => Cast(Literal(value), dataType).eval() }: _*)