Skip to content

Commit

Permalink
Improved WALSuite, and added ReceivedBlockHandlerSuite.
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Oct 7, 2014
1 parent 5eafb2e commit b86af2d
Show file tree
Hide file tree
Showing 6 changed files with 320 additions and 72 deletions.
5 changes: 5 additions & 0 deletions streaming/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@
<artifactId>junit-interface</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.postfixOps

import WriteAheadLogBasedBlockHandler._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.{BlockManager, StorageLevel, StreamBlockId}
import org.apache.spark.streaming.util.{Clock, SystemClock}
import org.apache.spark.util.Utils

private[streaming] sealed trait ReceivedBlock
Expand Down Expand Up @@ -55,20 +57,22 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
storageLevel: StorageLevel,
conf: SparkConf,
hadoopConf: Configuration,
checkpointDir: String
checkpointDir: String,
clock: Clock = new SystemClock
) extends ReceivedBlockHandler with Logging {

private val blockStoreTimeout = conf.getInt(
"spark.streaming.receiver.blockStoreTimeout", 30).seconds
private val rotationInterval = conf.getInt(
"spark.streaming.receiver.writeAheadLog.rotationInterval", 60)
private val rollingInterval = conf.getInt(
"spark.streaming.receiver.writeAheadLog.rollingInterval", 60)
private val maxFailures = conf.getInt(
"spark.streaming.receiver.writeAheadLog.maxFailures", 3)

private val logManager = new WriteAheadLogManager(
new Path(checkpointDir, new Path("receivedData", streamId.toString)).toString,
hadoopConf, rotationInterval, maxFailures,
"WriteAheadLogBasedHandler.WriteAheadLogManager"
checkpointDirToLogDir(checkpointDir, streamId),
hadoopConf, rollingInterval, maxFailures,
threadPoolName = "WriteAheadLogBasedHandler.WriteAheadLogManager",
clock = clock
)

implicit private val executionContext = ExecutionContext.fromExecutorService(
Expand Down Expand Up @@ -101,4 +105,14 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
def clearOldBlocks(threshTime: Long) {
logManager.clearOldLogs(threshTime)
}

def stop() {
logManager.stop()
}
}

private[streaming] object WriteAheadLogBasedBlockHandler {
def checkpointDirToLogDir(checkpointDir: String, streamId: Int): String = {
new Path(checkpointDir, new Path("receivedData", streamId.toString)).toString
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.Logging
import org.apache.spark.streaming.storage.WriteAheadLogManager._
import org.apache.spark.streaming.util.{Clock, ManualClock, SystemClock}
import org.apache.spark.streaming.util.{Clock, SystemClock}
import org.apache.spark.util.Utils

private[streaming] class WriteAheadLogManager(
Expand Down Expand Up @@ -55,17 +55,26 @@ private[streaming] class WriteAheadLogManager(
fileSegment
}

def readFromLog(): Iterator[ByteBuffer] = {
logsToIterator(pastLogs.map{ _.path}, hadoopConf)
def readFromLog(): Iterator[ByteBuffer] = synchronized {
val logFilesToRead = pastLogs.map{ _.path} ++ Option(currentLogPath)
println("Reading from the logs: " + logFilesToRead.mkString("\n"))
logFilesToRead.iterator.map { file =>
println(s"Creating log reader with $file")
new WriteAheadLogReader(file, hadoopConf)
} flatMap { x => x }
}

/**
* Delete the log files that are older than the threshold time.
*
* Its important to note that the threshold time is based on the time stamps used in the log
* files, and is therefore based on the local system time. So if there is coordination necessary
* between the node calculating the threshTime (say, driver node), and the local system time
* (say, worker node), the caller has to take account of possible time skew.
*/
def clearOldLogs(threshTime: Long): Unit = {
// Get the log files that are older than the threshold time, while accounting for possible
// time skew between the node issues the threshTime (say, driver node), and the local time at
// the node this is being executed (say, worker node)
val maxTimeSkewMs = 60 * 1000 // 1 minute
val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime - maxTimeSkewMs } }
logDebug(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
println(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")

def deleteFiles() {
Expand All @@ -87,6 +96,13 @@ private[streaming] class WriteAheadLogManager(
Future { deleteFiles() }
}

def stop(): Unit = synchronized {
println("Stopping log manager")
if (currentLogWriter != null) {
currentLogWriter.close()
}
}

private def getLogWriter: WriteAheadLogWriter = synchronized {
val currentTime = clock.currentTime()
if (currentLogPath == null || currentTime > currentLogWriterStopTime) {
Expand All @@ -112,12 +128,13 @@ private[streaming] class WriteAheadLogManager(
pastLogs.clear()
pastLogs ++= logFilesTologInfo(logFiles)
logInfo(s"Recovered ${logFiles.size} log files from $logDirectory")
logDebug(s"Recoved files are:\n${logFiles.mkString("\n")}")
logDebug(s"Recovered files are:\n${logFiles.mkString("\n")}")
}
}

private def resetWriter(): Unit = synchronized {
if (currentLogWriter != null) {
currentLogWriter
currentLogWriter.close()
currentLogWriter = null
}
Expand Down Expand Up @@ -170,15 +187,4 @@ private[storage] object WriteAheadLogManager {
}
}.sortBy { _.startTime }
}

def logsToIterator(
chronologicallySortedLogFiles: Seq[String],
hadoopConf: Configuration
): Iterator[ByteBuffer] = {
println("Creating iterator with " + chronologicallySortedLogFiles.mkString("[", "],[", "]"))
chronologicallySortedLogFiles.iterator.map { file =>
println(s"Creating log reader with $file")
new WriteAheadLogReader(file, hadoopConf)
} flatMap { x => x }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@ private[streaming] class WriteAheadLogWriter(path: String, conf: Configuration)

override private[streaming] def close(): Unit = synchronized {
closed = true
hflushOrSync()

This comment has been minimized.

Copy link
@harishreedharan

harishreedharan Oct 7, 2014

FSDataOutputStreams don't actually need to be flushed before close. A close guarantees that data is visible to readers (same as hflush). hflush + close causes additional round-trips to NN/DN.

This comment has been minimized.

Copy link
@tdas

tdas Oct 7, 2014

Author Owner

Same reason as below.

stream.close()
}

private def hflushOrSync() {
stream.getWrappedStream.flush()

This comment has been minimized.

Copy link
@harishreedharan

harishreedharan Oct 7, 2014

The hflush/sync method already does this. This call makes hflushOrSync cost more than it needs to (since this call actually incurs a system call to flush to OS buffers). If this is for test purposes, I'd wrap it inside an if conditional for tests.

This comment has been minimized.

Copy link
@tdas

tdas Oct 7, 2014

Author Owner

Oh yeah, that just me experimenting whether this helps or not. Will remove this obviously. Yeah, this is work in progress so things are still in flux.

hflushMethod.foreach(_.invoke(stream))
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package org.apache.spark.streaming.storage

import java.io.File

import scala.concurrent.duration._
import scala.language.postfixOps

import akka.actor.{ActorSystem, Props}
import com.google.common.io.Files
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
import org.scalatest.concurrent.Eventually._

import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf}
import org.apache.spark.network.nio.NioBlockTransferService
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.shuffle.hash.HashShuffleManager
import org.apache.spark.storage._
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.util.AkkaUtils
import WriteAheadLogBasedBlockHandler._
import WriteAheadLogSuite._

class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matchers {

val conf = new SparkConf()
.set("spark.authenticate", "false")
.set("spark.kryoserializer.buffer.mb", "1")
.set("spark.streaming.receiver.writeAheadLog.rollingInterval", "1")
val hadoopConf = new Configuration()
val storageLevel = StorageLevel.MEMORY_ONLY_SER
val streamId = 1
val securityMgr = new SecurityManager(conf)
val mapOutputTracker = new MapOutputTrackerMaster(conf)
val shuffleManager = new HashShuffleManager(conf)
val serializer = new KryoSerializer(conf)
val manualClock = new ManualClock

var actorSystem: ActorSystem = null
var blockManagerMaster: BlockManagerMaster = null
var blockManager: BlockManager = null
var receivedBlockHandler: ReceivedBlockHandler = null
var tempDirectory: File = null

before {
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
"test", "localhost", 0, conf = conf, securityManager = securityMgr)
this.actorSystem = actorSystem

conf.set("spark.driver.port", boundPort.toString)

blockManagerMaster = new BlockManagerMaster(
actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))),
conf, true)
blockManager = new BlockManager("bm", actorSystem, blockManagerMaster, serializer, 100000,
conf, mapOutputTracker, shuffleManager, new NioBlockTransferService(conf, securityMgr))
tempDirectory = Files.createTempDir()
manualClock.setTime(0)
}

after {
actorSystem.shutdown()
actorSystem.awaitTermination()
actorSystem = null
blockManagerMaster = null

if (blockManager != null) {
blockManager.stop()
blockManager = null
}

if (tempDirectory != null && tempDirectory.exists()) {
FileUtils.deleteDirectory(tempDirectory)
tempDirectory = null
}
}

test("WriteAheadLogBasedBlockHandler - store block") {
createWriteAheadLogBasedBlockHandler()
val (data, blockIds) = generateAndStoreData(receivedBlockHandler)
receivedBlockHandler.asInstanceOf[WriteAheadLogBasedBlockHandler].stop()

// Check whether blocks inserted in the block manager are correct
val blockManagerData = blockIds.flatMap { blockId =>
blockManager.getLocal(blockId).map { _.data }.getOrElse(Iterator.empty)
}
blockManagerData.toList shouldEqual data.toList

// Check whether the blocks written to the write ahead log are correct
val logFiles = getLogFiles()
logFiles.size should be > 1

val logData = logFiles.flatMap {
file => new WriteAheadLogReader(file.toString, hadoopConf).toSeq
}.flatMap { blockManager.dataDeserialize(StreamBlockId(streamId, 0), _ )}
logData.toList shouldEqual data.toList
}

test("WriteAheadLogBasedBlockHandler - clear old blocks") {
createWriteAheadLogBasedBlockHandler()
generateAndStoreData(receivedBlockHandler)
val preCleanupLogFiles = getLogFiles()
preCleanupLogFiles.size should be > 1

// this depends on the number of blocks inserted using generateAndStoreData()
manualClock.currentTime() shouldEqual 5000L

val cleanupThreshTime = 3000L
receivedBlockHandler.clearOldBlocks(cleanupThreshTime)
eventually(timeout(10000 millis), interval(10 millis)) {
getLogFiles().size should be < preCleanupLogFiles.size
}
}

def createWriteAheadLogBasedBlockHandler() {
receivedBlockHandler = new WriteAheadLogBasedBlockHandler(blockManager, 1,
storageLevel, conf, hadoopConf, tempDirectory.toString, manualClock)
}

def generateAndStoreData(
receivedBlockHandler: ReceivedBlockHandler): (Seq[String], Seq[StreamBlockId]) = {
val data = (1 to 100).map { _.toString }
val blocks = data.grouped(10).map { _.toIterator }.toSeq
val blockIds = (0 until blocks.size).map { i => StreamBlockId(streamId, i) }
blocks.zip(blockIds).foreach { case (block, id) =>
manualClock.addToTime(500) // log rolling interval set to 1000 ms through SparkConf
receivedBlockHandler.storeBlock(id, IteratorBlock(block))
}
(data, blockIds)
}

def getLogFiles(): Seq[File] = {
getLogFilesInDirectory(
new File(checkpointDirToLogDir(tempDirectory.toString, streamId)))
}
}
Loading

0 comments on commit b86af2d

Please sign in to comment.