Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1305: Support persisting RDD's directly to Tachyon #158

Closed
wants to merge 56 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
791189b
"Adding an option to persist Spark RDD blocks into Tachyon." move the…
RongGu Mar 16, 2014
556978b
fix the scalastyle errors
RongGu Mar 17, 2014
70ca182
a bit change in comment
RongGu Mar 21, 2014
8011a96
fix a brought-in mistake in StorageLevel
RongGu Mar 21, 2014
e01a271
update tachyon 0.4.1
haoyuan Mar 22, 2014
dc8ef24
add old storelevel constructor
haoyuan Mar 22, 2014
47304b3
make tachyonStore in BlockMananger lazy val; add more comments Storag…
haoyuan Mar 22, 2014
e554b1e
add python code
haoyuan Mar 22, 2014
fcaeab2
address Aaron's comment
haoyuan Mar 22, 2014
e3ddbba
add doc to use Tachyon cache mode.
haoyuan Mar 22, 2014
8859371
various minor fixes and clean up
haoyuan Mar 22, 2014
776a56c
address patrick's and ali's comments from the previous PR
haoyuan Mar 22, 2014
e82909c
minor cleanup
haoyuan Mar 22, 2014
bf278fa
fix python tests
haoyuan Mar 22, 2014
1dcadf9
typo
haoyuan Mar 22, 2014
77be7e8
address mateiz's comment about the temp folder name problem. The impl…
RongGu Mar 23, 2014
8968b67
exclude more libraries from tachyon dependency to be the same as refe…
haoyuan Mar 23, 2014
6a22c1a
fix scalastyle
haoyuan Mar 23, 2014
2825a13
up-merging to the current master branch of the apache spark
RongGu Mar 24, 2014
ca14469
bump tachyon version to 0.4.1-thrift
haoyuan Mar 24, 2014
716e93b
revert the version
haoyuan Mar 24, 2014
d827250
fix JsonProtocolSuie test failure
RongGu Mar 24, 2014
6adb58f
Merge branch 'master' of https://github.com/RongGu/spark-1
RongGu Mar 24, 2014
939e467
0.4.1-thrift from maven central
haoyuan Mar 25, 2014
bbeb4de
fix the JsonProtocolSuite test failure problem
RongGu Mar 25, 2014
eacb2e8
Merge branch 'master' of https://github.com/RongGu/spark-1
RongGu Mar 25, 2014
16c5798
make the dependency on tachyon as tachyon-0.4.1-thrift
RongGu Mar 25, 2014
86a2eab
tachyon 0.4.1-thrift is in the staging repo. but jenkins failed to do…
haoyuan Mar 24, 2014
fd84156
use randomUUID to generate sparkapp directory name on tachyon;minor c…
RongGu Mar 27, 2014
e700d9c
add the SparkTachyonHdfsLR example and some comments
RongGu Mar 27, 2014
76805aa
unifies the config properties name prefix; add the configs into docs/…
RongGu Mar 27, 2014
c9aeabf
rename the StorgeLevel.TACHYON as StorageLevel.OFF_HEAP
RongGu Mar 27, 2014
04301d3
rename StorageLevel.TACHYON to Storage.OFF_HEAP
RongGu Mar 27, 2014
4572f9f
reserving the old apply function API of StorageLevel
RongGu Mar 27, 2014
49cc724
update docs with off_headp option
haoyuan Mar 28, 2014
64348b2
update conf docs.
haoyuan Mar 28, 2014
589eafe
use TRY_CACHE instead of MUST_CACHE
haoyuan Mar 28, 2014
91fa09d
address patrick's comments
haoyuan Mar 28, 2014
be79d77
find a way to clean up some unnecessay metods and classed to make the…
RongGu Mar 29, 2014
619a9a8
set number of directories in TachyonStore back to 64; added a TODO ta…
RongGu Mar 29, 2014
ed73e19
Merge branch 'master' of github.com:RongGu/spark-1
haoyuan Mar 28, 2014
3dcace4
address matei's comments
haoyuan Apr 2, 2014
77d2703
change python api.git status
haoyuan Apr 2, 2014
d9a6438
fix for pspark
haoyuan Apr 2, 2014
9b97935
address aaron's comments
haoyuan Apr 2, 2014
5cc041c
address aaron's comments
haoyuan Apr 2, 2014
120e48a
changed the root-level dir name in Tachyon
RongGu Apr 3, 2014
8adfcfa
address arron's comment on inTachyonSize
RongGu Apr 3, 2014
51149e7
address aaron's comment on returning value of the remove() function i…
RongGu Apr 3, 2014
7cd4600
remove some logic code for tachyonstore's replication
RongGu Apr 3, 2014
55b5918
address matei's comment on the replication of offHeap storagelevel
RongGu Apr 4, 2014
e0f4891
better check offheap.
haoyuan Apr 4, 2014
a8b3ec6
merge master branch
haoyuan Apr 4, 2014
ae7834b
minor cleanup
haoyuan Apr 4, 2014
9f7fa1b
fix code style
haoyuan Apr 4, 2014
72b7768
merge master
haoyuan Apr 5, 2014
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,53 @@
<artifactId>derby</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.tachyonproject</groupId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exclusions here don't seem to match the exclusions in the sbt build (https://github.com/RongGu/spark-1/blob/master/project/SparkBuild.scala#L325) -- is there a reason for this difference?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this one excludes more than the sbt one:
excludeAll(excludeHadoop, excludeCurator, excludeEclipseJetty, excludePowermock),

This one also excluded junit. there is no particular reason to do so...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will Powermock and JUnit even be included in the tachyon-client artifact?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. They won't. So, from Tachyon 0.5.0, we use tachyon-client.

<artifactId>tachyon</artifactId>
<version>0.4.1-thrift</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-jsp</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-webapp</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
</exclusion>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
<exclusion>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
</exclusion>
<exclusion>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
Expand Down
46 changes: 33 additions & 13 deletions core/src/main/java/org/apache/spark/api/java/StorageLevels.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,18 @@
* Expose some commonly useful storage level constants.
*/
public class StorageLevels {
public static final StorageLevel NONE = create(false, false, false, 1);
public static final StorageLevel DISK_ONLY = create(true, false, false, 1);
public static final StorageLevel DISK_ONLY_2 = create(true, false, false, 2);
public static final StorageLevel MEMORY_ONLY = create(false, true, true, 1);
public static final StorageLevel MEMORY_ONLY_2 = create(false, true, true, 2);
public static final StorageLevel MEMORY_ONLY_SER = create(false, true, false, 1);
public static final StorageLevel MEMORY_ONLY_SER_2 = create(false, true, false, 2);
public static final StorageLevel MEMORY_AND_DISK = create(true, true, true, 1);
public static final StorageLevel MEMORY_AND_DISK_2 = create(true, true, true, 2);
public static final StorageLevel MEMORY_AND_DISK_SER = create(true, true, false, 1);
public static final StorageLevel MEMORY_AND_DISK_SER_2 = create(true, true, false, 2);
public static final StorageLevel NONE = create(false, false, false, false, 1);
public static final StorageLevel DISK_ONLY = create(true, false, false, false, 1);
public static final StorageLevel DISK_ONLY_2 = create(true, false, false, false, 2);
public static final StorageLevel MEMORY_ONLY = create(false, true, false, true, 1);
public static final StorageLevel MEMORY_ONLY_2 = create(false, true, false, true, 2);
public static final StorageLevel MEMORY_ONLY_SER = create(false, true, false, false, 1);
public static final StorageLevel MEMORY_ONLY_SER_2 = create(false, true, false, false, 2);
public static final StorageLevel MEMORY_AND_DISK = create(true, true, false, true, 1);
public static final StorageLevel MEMORY_AND_DISK_2 = create(true, true, false, true, 2);
public static final StorageLevel MEMORY_AND_DISK_SER = create(true, true, false, false, 1);
public static final StorageLevel MEMORY_AND_DISK_SER_2 = create(true, true, false, false, 2);
public static final StorageLevel OFF_HEAP = create(false, false, true, false, 1);

/**
* Create a new StorageLevel object.
Expand All @@ -42,7 +43,26 @@ public class StorageLevels {
* @param deserialized saved as deserialized objects, if true
* @param replication replication factor
*/
public static StorageLevel create(boolean useDisk, boolean useMemory, boolean deserialized, int replication) {
return StorageLevel.apply(useDisk, useMemory, deserialized, replication);
@Deprecated
public static StorageLevel create(boolean useDisk, boolean useMemory, boolean deserialized,
int replication) {
return StorageLevel.apply(useDisk, useMemory, false, deserialized, replication);
}

/**
* Create a new StorageLevel object.
* @param useDisk saved to disk, if true
* @param useMemory saved to memory, if true
* @param useOffHeap saved to Tachyon, if true
* @param deserialized saved as deserialized objects, if true
* @param replication replication factor
*/
public static StorageLevel create(
boolean useDisk,
boolean useMemory,
boolean useOffHeap,
boolean deserialized,
int replication) {
return StorageLevel.apply(useDisk, useMemory, useOffHeap, deserialized, replication);
}
}
10 changes: 7 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ package org.apache.spark

import java.io._
import java.net.URI
import java.util.{Properties, UUID}
import java.util.concurrent.atomic.AtomicInteger

import java.util.{Properties, UUID}
import java.util.UUID.randomUUID
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: incorrect import order

import scala.collection.{Map, Set}
import scala.collection.generic.Growable
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.reflect.{ClassTag, classTag}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
Expand Down Expand Up @@ -130,6 +129,11 @@ class SparkContext(
val master = conf.get("spark.master")
val appName = conf.get("spark.app.name")

// Generate the random name for a temp folder in Tachyon
// Add a timestamp as the suffix here to make it more safe
val tachyonFolderName = "spark-" + randomUUID.toString()
conf.set("spark.tachyonStore.folderName", tachyonFolderName)

val isLocal = (master == "local" || master.startsWith("local["))

if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ private[spark] class CoarseGrainedExecutorBackend(
case RegisteredExecutor(sparkProperties) =>
logInfo("Successfully registered with driver")
// Make this host instead of hostPort ?
executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties)
executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties,
false)

case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
Expand Down Expand Up @@ -105,7 +106,8 @@ private[spark] object CoarseGrainedExecutorBackend {
// set it
val sparkHostPort = hostname + ":" + boundPort
actorSystem.actorOf(
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores),
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId,
sparkHostPort, cores),
name = "Executor")
workerUrl.foreach{ url =>
actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,22 @@ object ExecutorExitCode {
/** DiskStore failed to create a local temporary directory after many attempts. */
val DISK_STORE_FAILED_TO_CREATE_DIR = 53

/** TachyonStore failed to initialize after many attempts. */
val TACHYON_STORE_FAILED_TO_INITIALIZE = 54

/** TachyonStore failed to create a local temporary directory after many attempts. */
val TACHYON_STORE_FAILED_TO_CREATE_DIR = 55

def explainExitCode(exitCode: Int): String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because you added two new special exit codes above, you should also modify this method to explain them. That's why we have the named exit codes here, to give users a meaningful message if the executor crashes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

exitCode match {
case UNCAUGHT_EXCEPTION => "Uncaught exception"
case UNCAUGHT_EXCEPTION_TWICE => "Uncaught exception, and logging the exception failed"
case OOM => "OutOfMemoryError"
case DISK_STORE_FAILED_TO_CREATE_DIR =>
"Failed to create local directory (bad spark.local.dir?)"
case TACHYON_STORE_FAILED_TO_INITIALIZE => "TachyonStore failed to initialize."
case TACHYON_STORE_FAILED_TO_CREATE_DIR =>
"TachyonStore failed to create a local temporary directory."
case _ =>
"Unknown executor exit code (" + exitCode + ")" + (
if (exitCode > 128) {
Expand Down
86 changes: 71 additions & 15 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,20 @@ package org.apache.spark.storage

import java.io.{File, InputStream, OutputStream}
import java.nio.{ByteBuffer, MappedByteBuffer}

import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.util.Random

import akka.actor.{ActorSystem, Cancellable, Props}
import it.unimi.dsi.fastutil.io.{FastBufferedOutputStream, FastByteArrayOutputStream}
import sun.nio.ch.DirectBuffer

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv, SparkException}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.network._
import org.apache.spark.serializer.Serializer
import org.apache.spark.util._


sealed trait Values

case class ByteBufferValues(buffer: ByteBuffer) extends Values
Expand All @@ -59,6 +57,17 @@ private[spark] class BlockManager(

private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
private[storage] val diskStore = new DiskStore(this, diskBlockManager)
var tachyonInitialized = false
private[storage] lazy val tachyonStore: TachyonStore = {
val storeDir = conf.get("spark.tachyonStore.baseDir", "/tmp_spark_tachyon")
val appFolderName = conf.get("spark.tachyonStore.folderName")
val tachyonStorePath = s"${storeDir}/${appFolderName}/${this.executorId}"
val tachyonMaster = conf.get("spark.tachyonStore.url", "tachyon://localhost:19998")
val tachyonBlockManager = new TachyonBlockManager(
shuffleBlockManager, tachyonStorePath, tachyonMaster)
tachyonInitialized = true
new TachyonStore(this, tachyonBlockManager)
}

// If we use Netty for shuffle, start a new Netty-based shuffle sender service.
private val nettyPort: Int = {
Expand Down Expand Up @@ -248,8 +257,10 @@ private[spark] class BlockManager(
if (info.tellMaster) {
val storageLevel = status.storageLevel
val inMemSize = Math.max(status.memSize, droppedMemorySize)
val inTachyonSize = status.tachyonSize
val onDiskSize = status.diskSize
master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
master.updateBlockInfo(
blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inTachyonSize)
} else true
}

Expand All @@ -259,22 +270,24 @@ private[spark] class BlockManager(
* and the updated in-memory and on-disk sizes.
*/
private def getCurrentBlockStatus(blockId: BlockId, info: BlockInfo): BlockStatus = {
val (newLevel, inMemSize, onDiskSize) = info.synchronized {
val (newLevel, inMemSize, onDiskSize, inTachyonSize) = info.synchronized {
info.level match {
case null =>
(StorageLevel.NONE, 0L, 0L)
(StorageLevel.NONE, 0L, 0L, 0L)
case level =>
val inMem = level.useMemory && memoryStore.contains(blockId)
val inTachyon = level.useOffHeap && tachyonStore.contains(blockId)
val onDisk = level.useDisk && diskStore.contains(blockId)
val deserialized = if (inMem) level.deserialized else false
val replication = if (inMem || onDisk) level.replication else 1
val storageLevel = StorageLevel(onDisk, inMem, deserialized, replication)
val replication = if (inMem || inTachyon || onDisk) level.replication else 1
val storageLevel = StorageLevel(onDisk, inMem, inTachyon, deserialized, replication)
val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
val tachyonSize = if (inTachyon) tachyonStore.getSize(blockId) else 0L
val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
(storageLevel, memSize, diskSize)
(storageLevel, memSize, diskSize, tachyonSize)
}
}
BlockStatus(newLevel, inMemSize, onDiskSize)
BlockStatus(newLevel, inMemSize, onDiskSize, inTachyonSize)
}

/**
Expand Down Expand Up @@ -354,6 +367,24 @@ private[spark] class BlockManager(
logDebug("Block " + blockId + " not found in memory")
}
}

// Look for the block in Tachyon
if (level.useOffHeap) {
logDebug("Getting block " + blockId + " from tachyon")
if (tachyonStore.contains(blockId)) {
tachyonStore.getBytes(blockId) match {
case Some(bytes) => {
if (!asValues) {
return Some(bytes)
} else {
return Some(dataDeserialize(blockId, bytes))
}
}
case None =>
logDebug("Block " + blockId + " not found in tachyon")
}
}
}

// Look for block on disk, potentially storing it back into memory if required:
if (level.useDisk) {
Expand Down Expand Up @@ -620,6 +651,23 @@ private[spark] class BlockManager(
}
// Keep track of which blocks are dropped from memory
res.droppedBlocks.foreach { block => updatedBlocks += block }
} else if (level.useOffHeap) {
// Save to Tachyon.
val res = data match {
case IteratorValues(iterator) =>
tachyonStore.putValues(blockId, iterator, level, false)
case ArrayBufferValues(array) =>
tachyonStore.putValues(blockId, array, level, false)
case ByteBufferValues(bytes) => {
bytes.rewind();
tachyonStore.putBytes(blockId, bytes, level)
}
}
size = res.size
res.data match {
case Right(newBytes) => bytesAfterPut = newBytes
case _ =>
}
} else {
// Save directly to disk.
// Don't get back the bytes unless we replicate them.
Expand All @@ -644,8 +692,8 @@ private[spark] class BlockManager(

val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
if (putBlockStatus.storageLevel != StorageLevel.NONE) {
// Now that the block is in either the memory or disk store, let other threads read it,
// and tell the master about it.
// Now that the block is in either the memory, tachyon, or disk store,
// let other threads read it, and tell the master about it.
marked = true
putBlockInfo.markReady(size)
if (tellMaster) {
Expand Down Expand Up @@ -707,7 +755,8 @@ private[spark] class BlockManager(
*/
var cachedPeers: Seq[BlockManagerId] = null
private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel) {
val tLevel = StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1)
val tLevel = StorageLevel(
level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
if (cachedPeers == null) {
cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
}
Expand Down Expand Up @@ -832,9 +881,10 @@ private[spark] class BlockManager(
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
val removedFromMemory = memoryStore.remove(blockId)
val removedFromDisk = diskStore.remove(blockId)
if (!removedFromMemory && !removedFromDisk) {
val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false
if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) {
logWarning("Block " + blockId + " could not be removed as it was not found in either " +
"the disk or memory store")
"the disk, memory, or tachyon store")
}
blockInfo.remove(blockId)
if (tellMaster && info.tellMaster) {
Expand Down Expand Up @@ -871,6 +921,9 @@ private[spark] class BlockManager(
if (level.useDisk) {
diskStore.remove(id)
}
if (level.useOffHeap) {
tachyonStore.remove(id)
}
iterator.remove()
logInfo("Dropped block " + id)
}
Expand Down Expand Up @@ -946,6 +999,9 @@ private[spark] class BlockManager(
blockInfo.clear()
memoryStore.clear()
diskStore.clear()
if (tachyonInitialized) {
tachyonStore.clear()
}
metadataCleaner.cancel()
broadcastCleaner.cancel()
logInfo("BlockManager stopped")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long): Boolean = {
diskSize: Long,
tachyonSize: Long): Boolean = {
val res = askDriverWithReply[Boolean](
UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize))
UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, tachyonSize))
logInfo("Updated info of block " + blockId)
res
}
Expand Down
Loading