Skip to content

Commit

Permalink
[SPARK-6122][Core] Upgrade Tachyon client version to 0.6.1.
Browse files Browse the repository at this point in the history
Changes the Tachyon client version from 0.5 to 0.6 in spark core and distribution script.

New dependencies in Tachyon 0.6.0 include

commons-codec:commons-codec:jar:1.5:compile
io.netty:netty-all:jar:4.0.23.Final:compile

These are already in spark core.

Author: Calvin Jia <[email protected]>

Closes #4867 from calvinjia/upgrade_tachyon_0.6.0 and squashes the following commits:

eed9230 [Calvin Jia] Update tachyon version to 0.6.1.
11907b3 [Calvin Jia] Use TachyonURI for tachyon paths instead of strings.
71bf441 [Calvin Jia] Upgrade Tachyon client version to 0.6.0.
  • Loading branch information
calvinjia authored and aarondav committed Mar 22, 2015
1 parent 6ef4863 commit a41b9c6
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 17 deletions.
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@
<dependency>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon-client</artifactId>
<version>0.5.0</version>
<version>0.6.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package org.apache.spark.storage
import java.text.SimpleDateFormat
import java.util.{Date, Random}

import tachyon.client.TachyonFS
import tachyon.client.TachyonFile
import tachyon.TachyonURI
import tachyon.client.{TachyonFile, TachyonFS}

import org.apache.spark.Logging
import org.apache.spark.executor.ExecutorExitCode
Expand All @@ -40,7 +40,7 @@ private[spark] class TachyonBlockManager(
val master: String)
extends Logging {

val client = if (master != null && master != "") TachyonFS.get(master) else null
val client = if (master != null && master != "") TachyonFS.get(new TachyonURI(master)) else null

if (client == null) {
logError("Failed to connect to the Tachyon as the master address is not configured")
Expand All @@ -60,11 +60,11 @@ private[spark] class TachyonBlockManager(
addShutdownHook()

def removeFile(file: TachyonFile): Boolean = {
client.delete(file.getPath(), false)
client.delete(new TachyonURI(file.getPath()), false)
}

def fileExists(file: TachyonFile): Boolean = {
client.exist(file.getPath())
client.exist(new TachyonURI(file.getPath()))
}

def getFile(filename: String): TachyonFile = {
Expand All @@ -81,15 +81,15 @@ private[spark] class TachyonBlockManager(
if (old != null) {
old
} else {
val path = tachyonDirs(dirId) + "/" + "%02x".format(subDirId)
val path = new TachyonURI(s"${tachyonDirs(dirId)}/${"%02x".format(subDirId)}")
client.mkdir(path)
val newDir = client.getFile(path)
subDirs(dirId)(subDirId) = newDir
newDir
}
}
}
val filePath = subDir + "/" + filename
val filePath = new TachyonURI(s"$subDir/$filename")
if(!client.exist(filePath)) {
client.createFile(filePath)
}
Expand All @@ -101,7 +101,7 @@ private[spark] class TachyonBlockManager(

// TODO: Some of the logic here could be consolidated/de-duplicated with that in the DiskStore.
private def createTachyonDirs(): Array[TachyonFile] = {
logDebug("Creating tachyon directories at root dirs '" + rootDirs + "'")
logDebug(s"Creating tachyon directories at root dirs '$rootDirs'")
val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
rootDirs.split(",").map { rootDir =>
var foundLocalDir = false
Expand All @@ -113,22 +113,21 @@ private[spark] class TachyonBlockManager(
tries += 1
try {
tachyonDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536))
val path = rootDir + "/" + "spark-tachyon-" + tachyonDirId
val path = new TachyonURI(s"$rootDir/spark-tachyon-$tachyonDirId")
if (!client.exist(path)) {
foundLocalDir = client.mkdir(path)
tachyonDir = client.getFile(path)
}
} catch {
case e: Exception =>
logWarning("Attempt " + tries + " to create tachyon dir " + tachyonDir + " failed", e)
logWarning(s"Attempt $tries to create tachyon dir $tachyonDir failed", e)
}
}
if (!foundLocalDir) {
logError("Failed " + MAX_DIR_CREATION_ATTEMPTS + " attempts to create tachyon dir in " +
rootDir)
logError(s"Failed $MAX_DIR_CREATION_ATTEMPTS attempts to create tachyon dir in $rootDir")
System.exit(ExecutorExitCode.TACHYON_STORE_FAILED_TO_CREATE_DIR)
}
logInfo("Created tachyon directory at " + tachyonDir)
logInfo(s"Created tachyon directory at $tachyonDir")
tachyonDir
}
}
Expand All @@ -145,7 +144,7 @@ private[spark] class TachyonBlockManager(
}
} catch {
case e: Exception =>
logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
logError(s"Exception while deleting tachyon spark dir: $tachyonDir", e)
}
}
client.close()
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.log4j.PropertyConfigurator
import org.eclipse.jetty.util.MultiException
import org.json4s._

import tachyon.TachyonURI
import tachyon.client.{TachyonFS, TachyonFile}

import org.apache.spark._
Expand Down Expand Up @@ -970,7 +972,7 @@ private[spark] object Utils extends Logging {
* Delete a file or directory and its contents recursively.
*/
def deleteRecursively(dir: TachyonFile, client: TachyonFS) {
if (!client.delete(dir.getPath(), true)) {
if (!client.delete(new TachyonURI(dir.getPath()), true)) {
throw new IOException("Failed to delete the tachyon dir: " + dir)
}
}
Expand Down
2 changes: 1 addition & 1 deletion make-distribution.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ SPARK_HOME="$(cd "`dirname "$0"`"; pwd)"
DISTDIR="$SPARK_HOME/dist"

SPARK_TACHYON=false
TACHYON_VERSION="0.5.0"
TACHYON_VERSION="0.6.1"
TACHYON_TGZ="tachyon-${TACHYON_VERSION}-bin.tar.gz"
TACHYON_URL="https://github.com/amplab/tachyon/releases/download/v${TACHYON_VERSION}/${TACHYON_TGZ}"

Expand Down

0 comments on commit a41b9c6

Please sign in to comment.