From 7aa3748aaf3582cec20b2169b1ba439f65bb79ae Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 18 May 2015 22:59:56 +0800 Subject: [PATCH] Optimizes FileStatusCache by introducing a map from parent directories to child files --- .../sql/sources/DataSourceStrategy.scala | 10 +++------- .../apache/spark/sql/sources/interfaces.scala | 19 ++++++++++++++----- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala index dcf845cc95698..1615a6dcbdb2a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala @@ -17,20 +17,16 @@ package org.apache.spark.sql.sources -import org.apache.hadoop.fs.Path - import org.apache.spark.Logging -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.rdd.{UnionRDD, RDD} -import org.apache.spark.sql.Row +import org.apache.spark.rdd.{RDD, UnionRDD} import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.types.{StructType, UTF8String, StringType} -import org.apache.spark.sql._ +import org.apache.spark.sql.types.{StringType, StructType, UTF8String} +import org.apache.spark.sql.{SaveMode, Strategy, execution, sources} /** * A Strategy for planning scans over data sources defined using the sources API. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 655d2da7ac3c0..811d551213a3a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -371,6 +371,8 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio private class FileStatusCache { var leafFiles = mutable.Map.empty[Path, FileStatus] + var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]] + var leafDirs = mutable.Map.empty[Path, FileStatus] def refresh(): Unit = { @@ -395,6 +397,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio val (dirs, files) = statuses.partition(_.isDir) leafDirs ++= dirs.map(d => d.getPath -> d).toMap leafFiles ++= files.map(f => f.getPath -> f).toMap + leafDirToChildrenFiles ++= files.groupBy(_.getPath.getParent) } } @@ -483,13 +486,19 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio filters: Array[Filter], inputPaths: Array[String]): RDD[Row] = { val inputStatuses = inputPaths.flatMap { input => - fileStatusCache.leafFiles.values.filter { status => - val path = new Path(input) - (status.getPath.getParent == path || status.getPath == path) && - !status.getPath.getName.startsWith("_") && - !status.getPath.getName.startsWith(".") + val path = new Path(input) + + // First assumes `input` is a directory path, and tries to get all files contained in it. + fileStatusCache.leafDirToChildrenFiles.getOrElse( + path, + // Otherwise, `input` might be a file path + fileStatusCache.leafFiles.get(path).toArray + ).filter { status => + val name = status.getPath.getName + !name.startsWith("_") && !name.startsWith(".") } } + buildScan(requiredColumns, filters, inputStatuses) }