Skip to content

Commit

Permalink
Added checkpoint data of ReceiverInputDStream, improved WALManager, r…
Browse files Browse the repository at this point in the history
…efactored WALSuite.
  • Loading branch information
tdas committed Oct 6, 2014
1 parent a49fd1d commit 5eafb2e
Show file tree
Hide file tree
Showing 8 changed files with 288 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.Logging
import org.apache.spark.streaming.Time

private[streaming]
class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
class DStreamCheckpointData[T: ClassTag](dstream: DStream[T])
extends Serializable with Logging {
protected val data = new HashMap[Time, AnyRef]()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
extends InputDStream[T](ssc_) {

/** Keeps all received blocks information */
private lazy val receivedBlockInfo = new HashMap[Time, Array[ReceivedBlockInfo]]
private[streaming] lazy val receivedBlockInfo = new HashMap[Time, Array[ReceivedBlockInfo]]
private[streaming] override val checkpointData = new ReceiverInputDStreamCheckpointData(this)

/** This is an unique identifier for the network input stream. */
val id = ssc.getNewReceiverStreamId()
Expand Down Expand Up @@ -92,3 +93,25 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
}
}

private[streaming] class ReceiverInputDStreamCheckpointData[T: ClassTag](
dstream: ReceiverInputDStream[T]) extends DStreamCheckpointData[T](dstream) {

private var timeToReceivedBlockInfo: Seq[(Time, Array[ReceivedBlockInfo])] = _

override def update(time: Time) {
timeToReceivedBlockInfo = dstream.receivedBlockInfo.toSeq
}

override def cleanup(time: Time) {
// not required, as block info copied in whole every time update is called
}

override def restore() {
Option(timeToReceivedBlockInfo).foreach { blockInfo =>
dstream.receivedBlockInfo.clear()
dstream.receivedBlockInfo ++= blockInfo
}
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ private[streaming] class ReceiverSupervisorImpl(
}

val time = System.currentTimeMillis
val fileSegmentOption = receivedBlockHandler.store(blockId, receivedBlock) match {
val fileSegmentOption = receivedBlockHandler.storeBlock(blockId, receivedBlock) match {
case f: FileSegment => Some(f)
case _ => None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class ReceivedBlockInfoCheckpointer(
logDirectory: String, conf: SparkConf, hadoopConf: Configuration) {

private val logManager = new WriteAheadLogManager(
logDirectory, conf, hadoopConf, "ReceiverTracker.WriteAheadLogManager")
logDirectory, hadoopConf, threadPoolName = "ReceiverTracker.WriteAheadLogManager")

def read(): Iterator[ReceivedBlockInfo] = {
logManager.readFromLog().map { byteBuffer =>
Expand All @@ -76,7 +76,7 @@ class ReceivedBlockInfoCheckpointer(
}

def clear(threshTime: Long) {
logManager.clear(threshTime)
logManager.clearOldLogs(threshTime)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ private[streaming] case class IteratorBlock(iterator: Iterator[_]) extends Recei
private[streaming] case class ByteBufferBlock(byteBuffer: ByteBuffer) extends ReceivedBlock

private[streaming] trait ReceivedBlockHandler {
def store(blockId: StreamBlockId, receivedBlock: ReceivedBlock): Option[AnyRef]
def clear(threshTime: Long) { }
def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): Option[AnyRef]
def clearOldBlocks(threshTime: Long)
}

private[streaming] class BlockManagerBasedBlockHandler(
Expand All @@ -29,7 +29,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
storageLevel: StorageLevel
) extends ReceivedBlockHandler {

def store(blockId: StreamBlockId, receivedBlock: ReceivedBlock): Option[AnyRef] = {
def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): Option[AnyRef] = {
receivedBlock match {
case ArrayBufferBlock(arrayBuffer) =>
blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, tellMaster = true)
Expand All @@ -42,6 +42,11 @@ private[streaming] class BlockManagerBasedBlockHandler(
}
None
}

def clearOldBlocks(threshTime: Long) {
// this is not used as blocks inserted into the BlockManager are cleared by DStream's clearing
// of BlockRDDs.
}
}

private[streaming] class WriteAheadLogBasedBlockHandler(
Expand All @@ -53,18 +58,23 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
checkpointDir: String
) extends ReceivedBlockHandler with Logging {

private val blockStoreTimeout = conf.getInt(
"spark.streaming.receiver.blockStoreTimeout", 30).seconds
private val rotationInterval = conf.getInt(
"spark.streaming.receiver.writeAheadLog.rotationInterval", 60)
private val maxFailures = conf.getInt(
"spark.streaming.receiver.writeAheadLog.maxFailures", 3)

private val logManager = new WriteAheadLogManager(
new Path(checkpointDir, new Path("receivedData", streamId.toString)).toString,
conf, hadoopConf, "WriteAheadLogBasedHandler.WriteAheadLogManager"
hadoopConf, rotationInterval, maxFailures,
"WriteAheadLogBasedHandler.WriteAheadLogManager"
)

private val blockStoreTimeout =
conf.getInt("spark.streaming.receiver.blockStoreTimeout", 30) seconds

implicit private val executionContext = ExecutionContext.fromExecutorService(
Utils.newDaemonFixedThreadPool(1, "WriteAheadLogBasedBlockHandler"))

def store(blockId: StreamBlockId, receivedBlock: ReceivedBlock): Option[AnyRef] = {
def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): Option[AnyRef] = {
val serializedBlock = receivedBlock match {
case ArrayBufferBlock(arrayBuffer) =>
blockManager.dataSerialize(blockId, arrayBuffer.iterator)
Expand All @@ -87,4 +97,8 @@ private[streaming] class WriteAheadLogBasedBlockHandler(

Some(Await.result(combinedFuture, blockStoreTimeout))
}

def clearOldBlocks(threshTime: Long) {
logManager.clearOldLogs(threshTime)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,40 +7,45 @@ import scala.concurrent.{ExecutionContext, Future}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.Logging
import org.apache.spark.streaming.storage.WriteAheadLogManager._
import org.apache.spark.streaming.util.{Clock, ManualClock, SystemClock}
import org.apache.spark.util.Utils

private[streaming] class WriteAheadLogManager(logDirectory: String, conf: SparkConf,
hadoopConf: Configuration, threadPoolName: String = "WriteAheadLogManager") extends Logging {
private[streaming] class WriteAheadLogManager(
logDirectory: String,
hadoopConf: Configuration,
rollingIntervalSecs: Int = 60,
maxFailures: Int = 3,
threadPoolName: String = "WriteAheadLogManager",
clock: Clock = new SystemClock
) extends Logging {

private case class LogInfo(startTime: Long, endTime: Long, path: String)

private val logWriterChangeIntervalSeconds =
conf.getInt("spark.streaming.wal.interval", 60)
private val logWriterMaxFailures =
conf.getInt("spark.streaming.wal.maxRetries", 3)
private val pastLogs = new ArrayBuffer[LogInfo]
implicit private val executionContext =
ExecutionContext.fromExecutorService(Utils.newDaemonFixedThreadPool(1, threadPoolName))

private var currentLogPath: String = null
private var currentLogWriter: WriteAheadLogWriter = null
private var currentLogWriterStartTime: Long = -1L
private var currentLogWriterStopTime: Long = -1L

recoverPastLogs()

def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized {
var fileSegment: FileSegment = null
var failures = 0
var lastException: Exception = null
var fileSegment: FileSegment = null
var succeeded = false
while (!succeeded && failures < logWriterMaxFailures) {
while (!succeeded && failures < maxFailures) {
try {
fileSegment = logWriter.write(byteBuffer)
fileSegment = getLogWriter.write(byteBuffer)
succeeded = true
} catch {
case ex: Exception =>
lastException = ex
logWarning("Failed to ...")
reset()
resetWriter()
failures += 1
}
}
Expand All @@ -51,10 +56,10 @@ private[streaming] class WriteAheadLogManager(logDirectory: String, conf: SparkC
}

def readFromLog(): Iterator[ByteBuffer] = {
WriteAheadLogManager.logsToIterator(pastLogs sortBy { _.startTime} map { _.path}, hadoopConf)
logsToIterator(pastLogs.map{ _.path}, hadoopConf)
}

def clear(threshTime: Long): Unit = {
def clearOldLogs(threshTime: Long): Unit = {
// Get the log files that are older than the threshold time, while accounting for possible
// time skew between the node issues the threshTime (say, driver node), and the local time at
// the node this is being executed (say, worker node)
Expand All @@ -72,7 +77,8 @@ private[streaming] class WriteAheadLogManager(logDirectory: String, conf: SparkC
synchronized { pastLogs -= logInfo }
logDebug(s"Cleared log file $logInfo")
} catch {
case ex: Exception => logWarning("Could not delete ...")
case ex: Exception =>
logWarning(s"Error clearing log file $logInfo", ex)
}
}
logInfo(s"Cleared log files in $logDirectory older than $threshTime")
Expand All @@ -81,31 +87,97 @@ private[streaming] class WriteAheadLogManager(logDirectory: String, conf: SparkC
Future { deleteFiles() }
}

private def logWriter: WriteAheadLogWriter = synchronized {
val currentTime = System.currentTimeMillis
if (currentLogWriter == null ||
currentTime - currentLogWriterStartTime > logWriterChangeIntervalSeconds * 1000) {
pastLogs += LogInfo(currentLogWriterStartTime, currentTime, currentLogPath)
val newLogPath = new Path(logDirectory, s"data-$currentTime".toString)
private def getLogWriter: WriteAheadLogWriter = synchronized {
val currentTime = clock.currentTime()
if (currentLogPath == null || currentTime > currentLogWriterStopTime) {
resetWriter()
if (currentLogPath != null) {
pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, currentLogPath)
}
currentLogWriterStartTime = currentTime
currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000)
val newLogPath = new Path(logDirectory,
timeToLogFile(currentLogWriterStartTime, currentLogWriterStopTime))
currentLogPath = newLogPath.toString
currentLogWriter = new WriteAheadLogWriter(currentLogPath, hadoopConf)
currentLogWriterStartTime = currentTime
}
currentLogWriter
}

private def reset(): Unit = synchronized {
currentLogWriter.close()
currentLogWriter = null
private def recoverPastLogs(): Unit = synchronized {
val logDirectoryPath = new Path(logDirectory)
val fs = logDirectoryPath.getFileSystem(hadoopConf)
if (fs.exists(logDirectoryPath) && fs.getFileStatus(logDirectoryPath).isDir) {
val logFiles = fs.listStatus(logDirectoryPath).map { _.getPath }
pastLogs.clear()
pastLogs ++= logFilesTologInfo(logFiles)
logInfo(s"Recovered ${logFiles.size} log files from $logDirectory")
logDebug(s"Recoved files are:\n${logFiles.mkString("\n")}")
}
}

private def resetWriter(): Unit = synchronized {
if (currentLogWriter != null) {
currentLogWriter.close()
currentLogWriter = null
}
}

/*
private def tryMultipleTimes[T](message: String)(body: => T): T = {
var result: T = null.asInstanceOf[T]
var failures = 0
var lastException: Exception = null
var succeeded = false
while (!succeeded && failures < maxFailures) {
try {
result = body
succeeded = true
} catch {
case ex: Exception =>
lastException = ex
resetWriter()
failures += 1
logWarning(message, ex)
}
}
if (!succeeded) {
throw new Exception(s"Failed $message after $failures failures", lastException)
}
result
} */
}

private[storage] object WriteAheadLogManager {

case class LogInfo(startTime: Long, endTime: Long, path: String)

val logFileRegex = """log-(\d+)-(\d+)""".r

def timeToLogFile(startTime: Long, stopTime: Long): String = {
s"log-$startTime-$stopTime"
}

def logFilesTologInfo(files: Seq[Path]): Seq[LogInfo] = {
println("Creating log info with " + files.mkString("[","],[","]"))
files.flatMap { file =>
logFileRegex.findFirstIn(file.getName()) match {
case logFileRegex(startTimeStr, stopTimeStr) =>
val startTime = startTimeStr.toLong
val stopTime = stopTimeStr.toLong
Some(LogInfo(startTime, stopTime, file.toString))
case _ => None
}
}.sortBy { _.startTime }
}

def logsToIterator(
chronologicallySortedLogFiles: Seq[String],
hadoopConf: Configuration
): Iterator[ByteBuffer] = {
println("Creating iterator with " + chronologicallySortedLogFiles.mkString("[", "],[", "]"))
chronologicallySortedLogFiles.iterator.map { file =>
println(s"Creating log reader with $file")
new WriteAheadLogReader(file, hadoopConf)
} flatMap { x => x }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ private[streaming] class WriteAheadLogReader(path: String, conf: Configuration)
private var nextItem: Option[ByteBuffer] = None

override def hasNext: Boolean = synchronized {
assertOpen()
if (closed) {
return false
}

if (nextItem.isDefined) { // handle the case where hasNext is called without calling next
true
} else {
Expand All @@ -40,31 +43,30 @@ private[streaming] class WriteAheadLogReader(path: String, conf: Configuration)
nextItem = Some(ByteBuffer.wrap(buffer))
true
} catch {
case e: EOFException => false
case e: Exception => throw e
case e: EOFException =>
close()
false
case e: Exception =>
close()
throw e
}
}
}

override def next(): ByteBuffer = synchronized {
// TODO: Possible error case where there are not enough bytes in the stream
// TODO: How to handle that?
val data = nextItem.getOrElse {
throw new IllegalStateException("next called without calling hasNext or after hasNext " +
"returned false")
close()
throw new IllegalStateException(
"next called without calling hasNext or after hasNext returned false")
}
nextItem = None // Ensure the next hasNext call loads new data.
data
}

override def close(): Unit = synchronized {
if (!closed) {
instream.close()
}
closed = true
instream.close()
}

private def assertOpen() {
HdfsUtils.checkState(!closed, "Stream is closed. Create a new Reader to read from the " +
"file.")
}

}
Loading

0 comments on commit 5eafb2e

Please sign in to comment.