Skip to content

Commit

Permalink
[SPARK-16736][CORE][SQL] purge superfluous fs calls
Browse files Browse the repository at this point in the history
A review of the code, working back from Hadoop's `FileSystem.exists()` and `FileSystem.isDirectory()` code, then removing uses of the calls when superfluous.

1. delete is harmless if called on a nonexistent path, so don't do any checks before deletes
1. any `FileSystem.exists()`  check before `getFileStatus()` or `open()` is superfluous as the operation itself does the check. Instead the `FileNotFoundException` is caught and triggers the downgraded path. When a `FileNotFoundException` was thrown before, the code still creates a new FNFE with the error messages. Though now the inner exceptions are nested, for easier diagnostics.

Initially, relying on Jenkins test runs.

One troublespot here is that some of the codepaths are clearly error situations; it's not clear that they have coverage anyway. Trying to create the failure conditions in tests would be ideal, but it will also be hard.

Author: Steve Loughran <[email protected]>

Closes apache#14371 from steveloughran/cloud/SPARK-16736-superfluous-fs-calls.
  • Loading branch information
steveloughran authored and Marcelo Vanzin committed Aug 17, 2016
1 parent 4d92af3 commit cc97ea1
Show file tree
Hide file tree
Showing 13 changed files with 92 additions and 109 deletions.
3 changes: 0 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1410,9 +1410,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
val scheme = new URI(schemeCorrectedPath).getScheme
if (!Array("http", "https", "ftp").contains(scheme)) {
val fs = hadoopPath.getFileSystem(hadoopConfiguration)
if (!fs.exists(hadoopPath)) {
throw new FileNotFoundException(s"Added file $hadoopPath does not exist.")
}
val isDir = fs.getFileStatus(hadoopPath).isDirectory
if (!isLocal && scheme == "file" && isDir) {
throw new SparkException(s"addFile does not support local directories when not running " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,16 +193,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
private def startPolling(): Unit = {
// Validate the log directory.
val path = new Path(logDir)
if (!fs.exists(path)) {
var msg = s"Log directory specified does not exist: $logDir"
if (logDir == DEFAULT_LOG_DIR) {
msg += " Did you configure the correct one through spark.history.fs.logDirectory?"
try {
if (!fs.getFileStatus(path).isDirectory) {
throw new IllegalArgumentException(
"Logging directory specified is not a directory: %s".format(logDir))
}
throw new IllegalArgumentException(msg)
}
if (!fs.getFileStatus(path).isDirectory) {
throw new IllegalArgumentException(
"Logging directory specified is not a directory: %s".format(logDir))
} catch {
case f: FileNotFoundException =>
var msg = s"Log directory specified does not exist: $logDir"
if (logDir == DEFAULT_LOG_DIR) {
msg += " Did you configure the correct one through spark.history.fs.logDirectory?"
}
throw new FileNotFoundException(msg).initCause(f)
}

// Disable the background thread during tests.
Expand Down Expand Up @@ -495,12 +497,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val leftToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
attemptsToClean.foreach { attempt =>
try {
val path = new Path(logDir, attempt.logPath)
if (fs.exists(path)) {
if (!fs.delete(path, true)) {
logWarning(s"Error deleting ${path}")
}
}
fs.delete(new Path(logDir, attempt.logPath), true)
} catch {
case e: AccessControlException =>
logInfo(s"No permission to delete ${attempt.logPath}, ignoring.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.rdd

import java.io.IOException
import java.io.{FileNotFoundException, IOException}

import scala.reflect.ClassTag
import scala.util.control.NonFatal
Expand Down Expand Up @@ -166,9 +166,6 @@ private[spark] object ReliableCheckpointRDD extends Logging {
val tempOutputPath =
new Path(outputDir, s".$finalOutputName-attempt-${ctx.attemptNumber()}")

if (fs.exists(tempOutputPath)) {
throw new IOException(s"Checkpoint failed: temporary path $tempOutputPath already exists")
}
val bufferSize = env.conf.getInt("spark.buffer.size", 65536)

val fileOutputStream = if (blockSize < 0) {
Expand Down Expand Up @@ -240,22 +237,20 @@ private[spark] object ReliableCheckpointRDD extends Logging {
val bufferSize = sc.conf.getInt("spark.buffer.size", 65536)
val partitionerFilePath = new Path(checkpointDirPath, checkpointPartitionerFileName)
val fs = partitionerFilePath.getFileSystem(sc.hadoopConfiguration)
if (fs.exists(partitionerFilePath)) {
val fileInputStream = fs.open(partitionerFilePath, bufferSize)
val serializer = SparkEnv.get.serializer.newInstance()
val deserializeStream = serializer.deserializeStream(fileInputStream)
val partitioner = Utils.tryWithSafeFinally[Partitioner] {
deserializeStream.readObject[Partitioner]
} {
deserializeStream.close()
}
logDebug(s"Read partitioner from $partitionerFilePath")
Some(partitioner)
} else {
logDebug("No partitioner file")
None
val fileInputStream = fs.open(partitionerFilePath, bufferSize)
val serializer = SparkEnv.get.serializer.newInstance()
val deserializeStream = serializer.deserializeStream(fileInputStream)
val partitioner = Utils.tryWithSafeFinally[Partitioner] {
deserializeStream.readObject[Partitioner]
} {
deserializeStream.close()
}
logDebug(s"Read partitioner from $partitionerFilePath")
Some(partitioner)
} catch {
case e: FileNotFoundException =>
logDebug("No partitioner file", e)
None
case NonFatal(e) =>
logWarning(s"Error reading partitioner from $checkpointDirPath, " +
s"partitioner will not be recovered which may lead to performance loss", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,7 @@ private[spark] object ReliableRDDCheckpointData extends Logging {
/** Clean up the files associated with the checkpoint data for this RDD. */
def cleanCheckpoint(sc: SparkContext, rddId: Int): Unit = {
checkpointPath(sc, rddId).foreach { path =>
val fs = path.getFileSystem(sc.hadoopConfiguration)
if (fs.exists(path)) {
if (!fs.delete(path, true)) {
logWarning(s"Error deleting ${path.toString()}")
}
}
path.getFileSystem(sc.hadoopConfiguration).delete(path, true)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private[spark] class EventLoggingListener(
*/
def start() {
if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
throw new IllegalArgumentException(s"Log directory $logBaseDir does not exist.")
throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.")
}

val workingPath = logPath + IN_PROGRESS
Expand All @@ -100,11 +100,8 @@ private[spark] class EventLoggingListener(
val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
val isDefaultLocal = defaultFs == null || defaultFs == "file"

if (shouldOverwrite && fileSystem.exists(path)) {
if (shouldOverwrite && fileSystem.delete(path, true)) {
logWarning(s"Event log $path already exists. Overwriting...")
if (!fileSystem.delete(path, true)) {
logWarning(s"Error deleting $path")
}
}

/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844).
Expand Down Expand Up @@ -301,12 +298,6 @@ private[spark] object EventLoggingListener extends Logging {
* @return input stream that holds one JSON record per line.
*/
def openEventLog(log: Path, fs: FileSystem): InputStream = {
// It's not clear whether FileSystem.open() throws FileNotFoundException or just plain
// IOException when a file does not exist, so try our best to throw a proper exception.
if (!fs.exists(log)) {
throw new FileNotFoundException(s"File $log does not exist.")
}

val in = new BufferedInputStream(fs.open(log))

// Compression codec is encoded as an extension, e.g. app_123.lzf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.repl

import java.io.{ByteArrayOutputStream, FilterInputStream, InputStream, IOException}
import java.io.{ByteArrayOutputStream, FileNotFoundException, FilterInputStream, InputStream, IOException}
import java.net.{HttpURLConnection, URI, URL, URLEncoder}
import java.nio.channels.Channels

Expand Down Expand Up @@ -147,10 +147,11 @@ class ExecutorClassLoader(
private def getClassFileInputStreamFromFileSystem(fileSystem: FileSystem)(
pathInDirectory: String): InputStream = {
val path = new Path(directory, pathInDirectory)
if (fileSystem.exists(path)) {
try {
fileSystem.open(path)
} else {
throw new ClassNotFoundException(s"Class file not found at path $path")
} catch {
case _: FileNotFoundException =>
throw new ClassNotFoundException(s"Class file not found at path $path")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution.streaming.state

import java.io.{DataInputStream, DataOutputStream, IOException}
import java.io.{DataInputStream, DataOutputStream, FileNotFoundException, IOException}

import scala.collection.JavaConverters._
import scala.collection.mutable
Expand Down Expand Up @@ -171,7 +171,7 @@ private[state] class HDFSBackedStateStoreProvider(
if (tempDeltaFileStream != null) {
tempDeltaFileStream.close()
}
if (tempDeltaFile != null && fs.exists(tempDeltaFile)) {
if (tempDeltaFile != null) {
fs.delete(tempDeltaFile, true)
}
logInfo("Aborted")
Expand Down Expand Up @@ -278,14 +278,12 @@ private[state] class HDFSBackedStateStoreProvider(

/** Initialize the store provider */
private def initialize(): Unit = {
if (!fs.exists(baseDir)) {
try {
fs.mkdirs(baseDir)
} else {
if (!fs.isDirectory(baseDir)) {
} catch {
case e: IOException =>
throw new IllegalStateException(
s"Cannot use ${id.checkpointLocation} for storing state data for $this as " +
s"$baseDir already exists and is not a directory")
}
s"Cannot use ${id.checkpointLocation} for storing state data for $this: $e ", e)
}
}

Expand Down Expand Up @@ -340,13 +338,16 @@ private[state] class HDFSBackedStateStoreProvider(

private def updateFromDeltaFile(version: Long, map: MapType): Unit = {
val fileToRead = deltaFile(version)
if (!fs.exists(fileToRead)) {
throw new IllegalStateException(
s"Error reading delta file $fileToRead of $this: $fileToRead does not exist")
}
var input: DataInputStream = null
val sourceStream = try {
fs.open(fileToRead)
} catch {
case f: FileNotFoundException =>
throw new IllegalStateException(
s"Error reading delta file $fileToRead of $this: $fileToRead does not exist", f)
}
try {
input = decompressStream(fs.open(fileToRead))
input = decompressStream(sourceStream)
var eof = false

while(!eof) {
Expand Down Expand Up @@ -405,8 +406,6 @@ private[state] class HDFSBackedStateStoreProvider(

private def readSnapshotFile(version: Long): Option[MapType] = {
val fileToRead = snapshotFile(version)
if (!fs.exists(fileToRead)) return None

val map = new MapType()
var input: DataInputStream = null

Expand Down Expand Up @@ -443,6 +442,9 @@ private[state] class HDFSBackedStateStoreProvider(
}
logInfo(s"Read snapshot file for version $version of $this from $fileToRead")
Some(map)
} catch {
case _: FileNotFoundException =>
None
} finally {
if (input != null) input.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ public void setUp() throws IOException {
hiveManagedPath = new Path(
catalog.hiveDefaultTableFilePath(new TableIdentifier("javaSavedTable")));
fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration());
if (fs.exists(hiveManagedPath)){
fs.delete(hiveManagedPath, true);
}
fs.delete(hiveManagedPath, true);

List<String> jsonObjects = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable"))
val filesystemPath = new Path(expectedPath)
val fs = filesystemPath.getFileSystem(spark.sessionState.newHadoopConf())
if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true)
fs.delete(filesystemPath, true)

// It is a managed table when we do not specify the location.
sql(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ object Checkpoint extends Logging {

val path = new Path(checkpointDir)
val fs = fsOption.getOrElse(path.getFileSystem(SparkHadoopUtil.get.conf))
if (fs.exists(path)) {
try {
val statuses = fs.listStatus(path)
if (statuses != null) {
val paths = statuses.map(_.getPath)
Expand All @@ -127,9 +127,10 @@ object Checkpoint extends Logging {
logWarning(s"Listing $path returned null")
Seq.empty
}
} else {
logWarning(s"Checkpoint directory $path does not exist")
Seq.empty
} catch {
case _: FileNotFoundException =>
logWarning(s"Checkpoint directory $path does not exist")
Seq.empty
}
}

Expand Down Expand Up @@ -229,9 +230,7 @@ class CheckpointWriter(
logInfo(s"Saving checkpoint for time $checkpointTime to file '$checkpointFile'")

// Write checkpoint to temp file
if (fs.exists(tempFile)) {
fs.delete(tempFile, true) // just in case it exists
}
fs.delete(tempFile, true) // just in case it exists
val fos = fs.create(tempFile)
Utils.tryWithSafeFinally {
fos.write(bytes)
Expand All @@ -242,9 +241,7 @@ class CheckpointWriter(
// If the checkpoint file exists, back it up
// If the backup exists as well, just delete it, otherwise rename will fail
if (fs.exists(checkpointFile)) {
if (fs.exists(backupFile)) {
fs.delete(backupFile, true) // just in case it exists
}
fs.delete(backupFile, true) // just in case it exists
if (!fs.rename(checkpointFile, backupFile)) {
logWarning(s"Could not rename $checkpointFile to $backupFile")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.spark.streaming.util

import java.io.FileNotFoundException
import java.nio.ByteBuffer
import java.util.{Iterator => JIterator}
import java.util.concurrent.RejectedExecutionException
Expand Down Expand Up @@ -231,13 +232,25 @@ private[streaming] class FileBasedWriteAheadLog(
val logDirectoryPath = new Path(logDirectory)
val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)

if (fileSystem.exists(logDirectoryPath) &&
fileSystem.getFileStatus(logDirectoryPath).isDirectory) {
val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath })
pastLogs.clear()
pastLogs ++= logFileInfo
logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")
logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
try {
// If you call listStatus(file) it returns a stat of the file in the array,
// rather than an array listing all the children.
// This makes it hard to differentiate listStatus(file) and
// listStatus(dir-with-one-child) except by examining the name of the returned status,
// and once you've got symlinks in the mix that differentiation isn't easy.
// Checking for the path being a directory is one more call to the filesystem, but
// leads to much clearer code.
if (fileSystem.getFileStatus(logDirectoryPath).isDirectory) {
val logFileInfo = logFilesTologInfo(
fileSystem.listStatus(logDirectoryPath).map { _.getPath })
pastLogs.clear()
pastLogs ++= logFileInfo
logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")
logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
}
} catch {
case _: FileNotFoundException =>
// there is no log directory, hence nothing to recover
}
}

Expand Down
Loading

0 comments on commit cc97ea1

Please sign in to comment.