Skip to content

Commit

Permalink
Revert "[SPARK-6122][Core] Upgrade Tachyon client version to 0.6.1."
Browse files Browse the repository at this point in the history
This reverts commit a41b9c6.
  • Loading branch information
pwendell committed Mar 23, 2015
1 parent 474d132 commit 6cd7058
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 18 deletions.
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@
<dependency>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon-client</artifactId>
<version>0.6.1</version>
<version>0.5.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package org.apache.spark.storage
import java.text.SimpleDateFormat
import java.util.{Date, Random}

import tachyon.TachyonURI
import tachyon.client.{TachyonFile, TachyonFS}
import tachyon.client.TachyonFS
import tachyon.client.TachyonFile

import org.apache.spark.Logging
import org.apache.spark.executor.ExecutorExitCode
Expand All @@ -40,7 +40,7 @@ private[spark] class TachyonBlockManager(
val master: String)
extends Logging {

val client = if (master != null && master != "") TachyonFS.get(new TachyonURI(master)) else null
val client = if (master != null && master != "") TachyonFS.get(master) else null

if (client == null) {
logError("Failed to connect to the Tachyon as the master address is not configured")
Expand All @@ -60,11 +60,11 @@ private[spark] class TachyonBlockManager(
addShutdownHook()

def removeFile(file: TachyonFile): Boolean = {
client.delete(new TachyonURI(file.getPath()), false)
client.delete(file.getPath(), false)
}

def fileExists(file: TachyonFile): Boolean = {
client.exist(new TachyonURI(file.getPath()))
client.exist(file.getPath())
}

def getFile(filename: String): TachyonFile = {
Expand All @@ -81,15 +81,15 @@ private[spark] class TachyonBlockManager(
if (old != null) {
old
} else {
val path = new TachyonURI(s"${tachyonDirs(dirId)}/${"%02x".format(subDirId)}")
val path = tachyonDirs(dirId) + "/" + "%02x".format(subDirId)
client.mkdir(path)
val newDir = client.getFile(path)
subDirs(dirId)(subDirId) = newDir
newDir
}
}
}
val filePath = new TachyonURI(s"$subDir/$filename")
val filePath = subDir + "/" + filename
if(!client.exist(filePath)) {
client.createFile(filePath)
}
Expand All @@ -101,7 +101,7 @@ private[spark] class TachyonBlockManager(

// TODO: Some of the logic here could be consolidated/de-duplicated with that in the DiskStore.
private def createTachyonDirs(): Array[TachyonFile] = {
logDebug(s"Creating tachyon directories at root dirs '$rootDirs'")
logDebug("Creating tachyon directories at root dirs '" + rootDirs + "'")
val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
rootDirs.split(",").map { rootDir =>
var foundLocalDir = false
Expand All @@ -113,21 +113,22 @@ private[spark] class TachyonBlockManager(
tries += 1
try {
tachyonDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536))
val path = new TachyonURI(s"$rootDir/spark-tachyon-$tachyonDirId")
val path = rootDir + "/" + "spark-tachyon-" + tachyonDirId
if (!client.exist(path)) {
foundLocalDir = client.mkdir(path)
tachyonDir = client.getFile(path)
}
} catch {
case e: Exception =>
logWarning(s"Attempt $tries to create tachyon dir $tachyonDir failed", e)
logWarning("Attempt " + tries + " to create tachyon dir " + tachyonDir + " failed", e)
}
}
if (!foundLocalDir) {
logError(s"Failed $MAX_DIR_CREATION_ATTEMPTS attempts to create tachyon dir in $rootDir")
logError("Failed " + MAX_DIR_CREATION_ATTEMPTS + " attempts to create tachyon dir in " +
rootDir)
System.exit(ExecutorExitCode.TACHYON_STORE_FAILED_TO_CREATE_DIR)
}
logInfo(s"Created tachyon directory at $tachyonDir")
logInfo("Created tachyon directory at " + tachyonDir)
tachyonDir
}
}
Expand All @@ -144,7 +145,7 @@ private[spark] class TachyonBlockManager(
}
} catch {
case e: Exception =>
logError(s"Exception while deleting tachyon spark dir: $tachyonDir", e)
logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
}
}
client.close()
Expand Down
4 changes: 1 addition & 3 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.log4j.PropertyConfigurator
import org.eclipse.jetty.util.MultiException
import org.json4s._

import tachyon.TachyonURI
import tachyon.client.{TachyonFS, TachyonFile}

import org.apache.spark._
Expand Down Expand Up @@ -972,7 +970,7 @@ private[spark] object Utils extends Logging {
* Delete a file or directory and its contents recursively.
*/
def deleteRecursively(dir: TachyonFile, client: TachyonFS) {
if (!client.delete(new TachyonURI(dir.getPath()), true)) {
if (!client.delete(dir.getPath(), true)) {
throw new IOException("Failed to delete the tachyon dir: " + dir)
}
}
Expand Down
2 changes: 1 addition & 1 deletion make-distribution.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ SPARK_HOME="$(cd "`dirname "$0"`"; pwd)"
DISTDIR="$SPARK_HOME/dist"

SPARK_TACHYON=false
TACHYON_VERSION="0.6.1"
TACHYON_VERSION="0.5.0"
TACHYON_TGZ="tachyon-${TACHYON_VERSION}-bin.tar.gz"
TACHYON_URL="https://github.com/amplab/tachyon/releases/download/v${TACHYON_VERSION}/${TACHYON_TGZ}"

Expand Down

0 comments on commit 6cd7058

Please sign in to comment.