Skip to content

Commit

Permalink
Optimizes FileStatusCache by introducing a map from parent directorie…
Browse files Browse the repository at this point in the history
…s to child files
  • Loading branch information
liancheng committed May 18, 2015
1 parent ba41250 commit 7aa3748
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,16 @@

package org.apache.spark.sql.sources

import org.apache.hadoop.fs.Path

import org.apache.spark.Logging
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{UnionRDD, RDD}
import org.apache.spark.sql.Row
import org.apache.spark.rdd.{RDD, UnionRDD}
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.types.{StructType, UTF8String, StringType}
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StringType, StructType, UTF8String}
import org.apache.spark.sql.{SaveMode, Strategy, execution, sources}

/**
* A Strategy for planning scans over data sources defined using the sources API.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,8 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
private class FileStatusCache {
var leafFiles = mutable.Map.empty[Path, FileStatus]

var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]]

var leafDirs = mutable.Map.empty[Path, FileStatus]

def refresh(): Unit = {
Expand All @@ -395,6 +397,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
val (dirs, files) = statuses.partition(_.isDir)
leafDirs ++= dirs.map(d => d.getPath -> d).toMap
leafFiles ++= files.map(f => f.getPath -> f).toMap
leafDirToChildrenFiles ++= files.groupBy(_.getPath.getParent)
}
}

Expand Down Expand Up @@ -483,13 +486,19 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
filters: Array[Filter],
inputPaths: Array[String]): RDD[Row] = {
val inputStatuses = inputPaths.flatMap { input =>
fileStatusCache.leafFiles.values.filter { status =>
val path = new Path(input)
(status.getPath.getParent == path || status.getPath == path) &&
!status.getPath.getName.startsWith("_") &&
!status.getPath.getName.startsWith(".")
val path = new Path(input)

// First assumes `input` is a directory path, and tries to get all files contained in it.
fileStatusCache.leafDirToChildrenFiles.getOrElse(
path,
// Otherwise, `input` might be a file path
fileStatusCache.leafFiles.get(path).toArray
).filter { status =>
val name = status.getPath.getName
!name.startsWith("_") && !name.startsWith(".")
}
}

buildScan(requiredColumns, filters, inputStatuses)
}

Expand Down

0 comments on commit 7aa3748

Please sign in to comment.