Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Driver HA support using WriteAheadLog #14

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions streaming/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@
<artifactId>junit-interface</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.Logging
import org.apache.spark.streaming.Time

private[streaming]
class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
class DStreamCheckpointData[T: ClassTag](dstream: DStream[T])
extends Serializable with Logging {
protected val data = new HashMap[Time, AnyRef]()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import scala.collection.mutable.HashMap
import scala.reflect.ClassTag

import org.apache.spark.rdd.{BlockRDD, RDD}
import org.apache.spark.storage.BlockId
import org.apache.spark.storage.{StorageLevel, BlockId}
import org.apache.spark.streaming._
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
import org.apache.spark.streaming.storage.rdd.HDFSBackedBlockRDD

/**
* Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
Expand All @@ -39,9 +40,6 @@ import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingContext)
extends InputDStream[T](ssc_) {

/** Keeps all received blocks information */
private lazy val receivedBlockInfo = new HashMap[Time, Array[ReceivedBlockInfo]]

/** This is an unique identifier for the network input stream. */
val id = ssc.getNewReceiverStreamId()

Expand All @@ -62,19 +60,27 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
// If this is called for any time before the start time of the context,
// then this returns an empty RDD. This may happen when recovering from a
// master failure
if (validTime >= graph.startTime) {
val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
receivedBlockInfo(validTime) = blockInfo
val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
Some(new BlockRDD[T](ssc.sc, blockIds))
val blockRDD = if (validTime >= graph.startTime) {
val blockInfo = getReceivedBlockInfo(validTime)
val blockIds = blockInfo.map(_.blockId).map { _.asInstanceOf[BlockId] } toArray
val fileSegments = blockInfo.flatMap(_.fileSegmentOption).toArray
logInfo("Stream " + id + ": allocated " + blockInfo.map(_.blockId).mkString(", "))

if (fileSegments.nonEmpty) {
new HDFSBackedBlockRDD[T](ssc.sparkContext, ssc.sparkContext.hadoopConfiguration,
blockIds, fileSegments, storeInBlockManager = false, StorageLevel.MEMORY_ONLY_SER)
} else {
new BlockRDD[T](ssc.sc, blockIds)
}
} else {
Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
new BlockRDD[T](ssc.sc, Array[BlockId]())
}
Some(blockRDD)
}

/** Get information on received blocks. */
private[streaming] def getReceivedBlockInfo(time: Time) = {
receivedBlockInfo.get(time).getOrElse(Array.empty[ReceivedBlockInfo])
private[streaming] def getReceivedBlockInfo(time: Time): Seq[ReceivedBlockInfo] = {
ssc.scheduler.receiverTracker.getReceivedBlocks(time, id)
}

/**
Expand All @@ -85,10 +91,6 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
*/
private[streaming] override def clearMetadata(time: Time) {
super.clearMetadata(time)
val oldReceivedBlocks = receivedBlockInfo.filter(_._1 <= (time - rememberDuration))
receivedBlockInfo --= oldReceivedBlocks.keys
logDebug("Cleared " + oldReceivedBlocks.size + " RDDs that were older than " +
(time - rememberDuration) + ": " + oldReceivedBlocks.keys.mkString(", "))
ssc.scheduler.receiverTracker.cleanupOldInfo(time - rememberDuration)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class HDFSBackedBlockRDD[T: ClassTag](
block.data.asInstanceOf[Iterator[T]]
// Data not found in Block Manager, grab it from HDFS
case None =>
logInfo("Reading partition data from write ahead log " + partition.segment.path)
val reader = new WriteAheadLogRandomReader(partition.segment.path, hadoopConf)
val dataRead = reader.read(partition.segment)
reader.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,13 @@ import scala.concurrent.Await

import akka.actor.{Actor, Props}
import akka.pattern.ask

import com.google.common.base.Throwables

import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.streaming.scheduler._
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkException, Logging, SparkEnv}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.scheduler.DeregisterReceiver
import org.apache.spark.streaming.scheduler.AddBlock
import org.apache.spark.streaming.scheduler.RegisterReceiver
import org.apache.spark.streaming.scheduler._
import org.apache.spark.streaming.storage._
import org.apache.spark.util.{AkkaUtils, Utils}

/**
* Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]]
Expand All @@ -44,12 +41,26 @@ import org.apache.spark.streaming.scheduler.RegisterReceiver
*/
private[streaming] class ReceiverSupervisorImpl(
receiver: Receiver[_],
env: SparkEnv
env: SparkEnv,
hadoopConf: Configuration,
checkpointDirOption: Option[String]
) extends ReceiverSupervisor(receiver, env.conf) with Logging {

private val blockManager = env.blockManager
private val receivedBlockHandler: ReceivedBlockHandler = {
if (env.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) {
if (checkpointDirOption.isEmpty) {
throw new SparkException(
"Cannot enable receiver write-ahead log without checkpoint directory set. " +
"Please use streamingContext.checkpoint() to set the checkpoint directory. " +
"See documentation for more details.")
}
new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
} else {
new BlockManagerBasedBlockHandler(env.blockManager, receiver.streamId, receiver.storageLevel)
}
}

private val storageLevel = receiver.storageLevel

/** Remote Akka actor for the ReceiverTracker */
private val trackerActor = {
Expand Down Expand Up @@ -108,11 +119,7 @@ private[streaming] class ReceiverSupervisorImpl(
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
) {
val blockId = optionalBlockId.getOrElse(nextBlockId)
val time = System.currentTimeMillis
blockManager.putArray(blockId, arrayBuffer.toArray[Any], storageLevel, tellMaster = true)
logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")
reportPushedBlock(blockId, arrayBuffer.size, optionalMetadata)
pushAndReportBlock(ArrayBufferBlock(arrayBuffer), optionalMetadata, optionalBlockId)
}

/** Store a iterator of received data as a data block into Spark's memory. */
Expand All @@ -121,11 +128,7 @@ private[streaming] class ReceiverSupervisorImpl(
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
) {
val blockId = optionalBlockId.getOrElse(nextBlockId)
val time = System.currentTimeMillis
blockManager.putIterator(blockId, iterator, storageLevel, tellMaster = true)
logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")
reportPushedBlock(blockId, -1, optionalMetadata)
pushAndReportBlock(IteratorBlock(iterator), optionalMetadata, optionalBlockId)
}

/** Store the bytes of received data as a data block into Spark's memory. */
Expand All @@ -134,17 +137,32 @@ private[streaming] class ReceiverSupervisorImpl(
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
) {
pushAndReportBlock(ByteBufferBlock(bytes), optionalMetadata, optionalBlockId)
}

/** Store block and report it to driver */
def pushAndReportBlock(
receivedBlock: ReceivedBlock,
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
) {
val blockId = optionalBlockId.getOrElse(nextBlockId)
val numRecords = receivedBlock match {
case ArrayBufferBlock(arrayBuffer) => arrayBuffer.size
case _ => -1
}

val time = System.currentTimeMillis
blockManager.putBytes(blockId, bytes, storageLevel, tellMaster = true)
logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")
reportPushedBlock(blockId, -1, optionalMetadata)
}
val fileSegmentOption = receivedBlockHandler.storeBlock(blockId, receivedBlock) match {
case Some(f: FileSegment) => Some(f)
case _ => None
}
logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")

/** Report pushed block */
def reportPushedBlock(blockId: StreamBlockId, numRecords: Long, optionalMetadata: Option[Any]) {
val blockInfo = ReceivedBlockInfo(streamId, blockId, numRecords, optionalMetadata.orNull)
trackerActor ! AddBlock(blockInfo)
val blockInfo = ReceivedBlockInfo(streamId,
blockId, numRecords, optionalMetadata.orNull, fileSegmentOption)
val future = trackerActor.ask(AddBlock(blockInfo))(askTimeout)
Await.result(future, askTimeout)
logDebug("Reported block " + blockId)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,10 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
case Success(jobs) =>
val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>
val streamId = stream.id
val receivedBlockInfo = stream.getReceivedBlockInfo(time)
val receivedBlockInfo = stream.getReceivedBlockInfo(time).toArray
(streamId, receivedBlockInfo)
}.toMap
jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))
jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo.toMap))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
Expand Down
Loading