Skip to content

Commit

Permalink
[SPARK-7868] [SQL] Ignores _temporary directories in HadoopFsRelation
Browse files Browse the repository at this point in the history
So that potential partial/corrupted data files left by failed tasks/jobs won't affect normal data scan.

Author: Cheng Lian <[email protected]>

Closes #6411 from liancheng/spark-7868 and squashes the following commits:

273ea36 [Cheng Lian] Ignores _temporary directories
  • Loading branch information
liancheng authored and yhuai committed May 27, 2015
1 parent 0c33c7b commit b463e6d
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.SerializableWritable
import org.apache.spark.sql.{Row, _}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.types.StructType

/**
* ::DeveloperApi::
Expand Down Expand Up @@ -378,24 +378,30 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]]

def refresh(): Unit = {
// We don't filter files/directories whose name start with "_" or "." here, as specific data
// sources may take advantages over them (e.g. Parquet _metadata and _common_metadata files).
// But "_temporary" directories are explicitly ignored since failed tasks/jobs may leave
// partial/corrupted data files there.
def listLeafFilesAndDirs(fs: FileSystem, status: FileStatus): Set[FileStatus] = {
val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
val leafDirs = if (dirs.isEmpty) Set(status) else Set.empty[FileStatus]
files.toSet ++ leafDirs ++ dirs.flatMap(dir => listLeafFilesAndDirs(fs, dir))
if (status.getPath.getName.toLowerCase == "_temporary") {
Set.empty
} else {
val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
val leafDirs = if (dirs.isEmpty) Set(status) else Set.empty[FileStatus]
files.toSet ++ leafDirs ++ dirs.flatMap(dir => listLeafFilesAndDirs(fs, dir))
}
}

leafFiles.clear()

// We don't filter files/directories like _temporary/_SUCCESS here, as specific data sources
// may take advantages over them (e.g. Parquet _metadata and _common_metadata files).
val statuses = paths.flatMap { path =>
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
Try(fs.getFileStatus(qualified)).toOption.toArray.flatMap(listLeafFilesAndDirs(fs, _))
}

val (dirs, files) = statuses.partition(_.isDir)
val files = statuses.filterNot(_.isDir)
leafFiles ++= files.map(f => f.getPath -> f).toMap
leafDirToChildrenFiles ++= files.groupBy(_.getPath.getParent)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,4 +548,20 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
checkAnswer(table("t"), df.select('b, 'c, 'a).collect())
}
}

test("SPARK-7868: _temporary directories should be ignored") {
withTempPath { dir =>
val df = Seq("a", "b", "c").zipWithIndex.toDF()

df.write
.format("parquet")
.save(dir.getCanonicalPath)

df.write
.format("parquet")
.save(s"${dir.getCanonicalPath}/_temporary")

checkAnswer(read.format("parquet").load(dir.getCanonicalPath), df.collect())
}
}
}

0 comments on commit b463e6d

Please sign in to comment.