diff --git a/streaming/pom.xml b/streaming/pom.xml
index 12f900c91eb98..002c35f251390 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -68,6 +68,11 @@
junit-interface
test
+
+ org.mockito
+ mockito-all
+ test
+
target/scala-${scala.binary.version}/classes
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/ReceivedBlockHandler.scala
index 052bb59c9c324..299d6069d7018 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/storage/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/ReceivedBlockHandler.scala
@@ -7,10 +7,12 @@ import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.postfixOps
+import WriteAheadLogBasedBlockHandler._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.{BlockManager, StorageLevel, StreamBlockId}
+import org.apache.spark.streaming.util.{Clock, SystemClock}
import org.apache.spark.util.Utils
private[streaming] sealed trait ReceivedBlock
@@ -55,20 +57,22 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
storageLevel: StorageLevel,
conf: SparkConf,
hadoopConf: Configuration,
- checkpointDir: String
+ checkpointDir: String,
+ clock: Clock = new SystemClock
) extends ReceivedBlockHandler with Logging {
private val blockStoreTimeout = conf.getInt(
"spark.streaming.receiver.blockStoreTimeout", 30).seconds
- private val rotationInterval = conf.getInt(
- "spark.streaming.receiver.writeAheadLog.rotationInterval", 60)
+ private val rollingInterval = conf.getInt(
+ "spark.streaming.receiver.writeAheadLog.rollingInterval", 60)
private val maxFailures = conf.getInt(
"spark.streaming.receiver.writeAheadLog.maxFailures", 3)
private val logManager = new WriteAheadLogManager(
- new Path(checkpointDir, new Path("receivedData", streamId.toString)).toString,
- hadoopConf, rotationInterval, maxFailures,
- "WriteAheadLogBasedHandler.WriteAheadLogManager"
+ checkpointDirToLogDir(checkpointDir, streamId),
+ hadoopConf, rollingInterval, maxFailures,
+ threadPoolName = "WriteAheadLogBasedHandler.WriteAheadLogManager",
+ clock = clock
)
implicit private val executionContext = ExecutionContext.fromExecutorService(
@@ -101,4 +105,14 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
def clearOldBlocks(threshTime: Long) {
logManager.clearOldLogs(threshTime)
}
+
+ def stop() {
+ logManager.stop()
+ }
+}
+
+private[streaming] object WriteAheadLogBasedBlockHandler {
+ def checkpointDirToLogDir(checkpointDir: String, streamId: Int): String = {
+ new Path(checkpointDir, new Path("receivedData", streamId.toString)).toString
+ }
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala
index 62086a3194cad..ff878e7b8bc31 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala
@@ -9,7 +9,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.Logging
import org.apache.spark.streaming.storage.WriteAheadLogManager._
-import org.apache.spark.streaming.util.{Clock, ManualClock, SystemClock}
+import org.apache.spark.streaming.util.{Clock, SystemClock}
import org.apache.spark.util.Utils
private[streaming] class WriteAheadLogManager(
@@ -55,17 +55,26 @@ private[streaming] class WriteAheadLogManager(
fileSegment
}
- def readFromLog(): Iterator[ByteBuffer] = {
- logsToIterator(pastLogs.map{ _.path}, hadoopConf)
+ def readFromLog(): Iterator[ByteBuffer] = synchronized {
+ val logFilesToRead = pastLogs.map{ _.path} ++ Option(currentLogPath)
+ println("Reading from the logs: " + logFilesToRead.mkString("\n"))
+ logFilesToRead.iterator.map { file =>
+ println(s"Creating log reader with $file")
+ new WriteAheadLogReader(file, hadoopConf)
+ } flatMap { x => x }
}
+ /**
+ * Delete the log files that are older than the threshold time.
+ *
+ * Its important to note that the threshold time is based on the time stamps used in the log
+ * files, and is therefore based on the local system time. So if there is coordination necessary
+ * between the node calculating the threshTime (say, driver node), and the local system time
+ * (say, worker node), the caller has to take account of possible time skew.
+ */
def clearOldLogs(threshTime: Long): Unit = {
- // Get the log files that are older than the threshold time, while accounting for possible
- // time skew between the node issues the threshTime (say, driver node), and the local time at
- // the node this is being executed (say, worker node)
- val maxTimeSkewMs = 60 * 1000 // 1 minute
- val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime - maxTimeSkewMs } }
- logDebug(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
+ val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
+ println(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
def deleteFiles() {
@@ -87,6 +96,13 @@ private[streaming] class WriteAheadLogManager(
Future { deleteFiles() }
}
+ def stop(): Unit = synchronized {
+ println("Stopping log manager")
+ if (currentLogWriter != null) {
+ currentLogWriter.close()
+ }
+ }
+
private def getLogWriter: WriteAheadLogWriter = synchronized {
val currentTime = clock.currentTime()
if (currentLogPath == null || currentTime > currentLogWriterStopTime) {
@@ -112,12 +128,13 @@ private[streaming] class WriteAheadLogManager(
pastLogs.clear()
pastLogs ++= logFilesTologInfo(logFiles)
logInfo(s"Recovered ${logFiles.size} log files from $logDirectory")
- logDebug(s"Recoved files are:\n${logFiles.mkString("\n")}")
+ logDebug(s"Recovered files are:\n${logFiles.mkString("\n")}")
}
}
private def resetWriter(): Unit = synchronized {
if (currentLogWriter != null) {
+ currentLogWriter
currentLogWriter.close()
currentLogWriter = null
}
@@ -170,15 +187,4 @@ private[storage] object WriteAheadLogManager {
}
}.sortBy { _.startTime }
}
-
- def logsToIterator(
- chronologicallySortedLogFiles: Seq[String],
- hadoopConf: Configuration
- ): Iterator[ByteBuffer] = {
- println("Creating iterator with " + chronologicallySortedLogFiles.mkString("[", "],[", "]"))
- chronologicallySortedLogFiles.iterator.map { file =>
- println(s"Creating log reader with $file")
- new WriteAheadLogReader(file, hadoopConf)
- } flatMap { x => x }
- }
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala
index 8a2db8305a7e2..f8f6089318344 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala
@@ -55,10 +55,12 @@ private[streaming] class WriteAheadLogWriter(path: String, conf: Configuration)
override private[streaming] def close(): Unit = synchronized {
closed = true
+ hflushOrSync()
stream.close()
}
private def hflushOrSync() {
+ stream.getWrappedStream.flush()
hflushMethod.foreach(_.invoke(stream))
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/storage/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/storage/ReceivedBlockHandlerSuite.scala
new file mode 100644
index 0000000000000..9c493210d2575
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/storage/ReceivedBlockHandlerSuite.scala
@@ -0,0 +1,138 @@
+package org.apache.spark.streaming.storage
+
+import java.io.File
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import akka.actor.{ActorSystem, Props}
+import com.google.common.io.Files
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf}
+import org.apache.spark.network.nio.NioBlockTransferService
+import org.apache.spark.scheduler.LiveListenerBus
+import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.shuffle.hash.HashShuffleManager
+import org.apache.spark.storage._
+import org.apache.spark.streaming.util.ManualClock
+import org.apache.spark.util.AkkaUtils
+import WriteAheadLogBasedBlockHandler._
+import WriteAheadLogSuite._
+
+class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matchers {
+
+ val conf = new SparkConf()
+ .set("spark.authenticate", "false")
+ .set("spark.kryoserializer.buffer.mb", "1")
+ .set("spark.streaming.receiver.writeAheadLog.rollingInterval", "1")
+ val hadoopConf = new Configuration()
+ val storageLevel = StorageLevel.MEMORY_ONLY_SER
+ val streamId = 1
+ val securityMgr = new SecurityManager(conf)
+ val mapOutputTracker = new MapOutputTrackerMaster(conf)
+ val shuffleManager = new HashShuffleManager(conf)
+ val serializer = new KryoSerializer(conf)
+ val manualClock = new ManualClock
+
+ var actorSystem: ActorSystem = null
+ var blockManagerMaster: BlockManagerMaster = null
+ var blockManager: BlockManager = null
+ var receivedBlockHandler: ReceivedBlockHandler = null
+ var tempDirectory: File = null
+
+ before {
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
+ "test", "localhost", 0, conf = conf, securityManager = securityMgr)
+ this.actorSystem = actorSystem
+
+ conf.set("spark.driver.port", boundPort.toString)
+
+ blockManagerMaster = new BlockManagerMaster(
+ actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))),
+ conf, true)
+ blockManager = new BlockManager("bm", actorSystem, blockManagerMaster, serializer, 100000,
+ conf, mapOutputTracker, shuffleManager, new NioBlockTransferService(conf, securityMgr))
+ tempDirectory = Files.createTempDir()
+ manualClock.setTime(0)
+ }
+
+ after {
+ actorSystem.shutdown()
+ actorSystem.awaitTermination()
+ actorSystem = null
+ blockManagerMaster = null
+
+ if (blockManager != null) {
+ blockManager.stop()
+ blockManager = null
+ }
+
+ if (tempDirectory != null && tempDirectory.exists()) {
+ FileUtils.deleteDirectory(tempDirectory)
+ tempDirectory = null
+ }
+ }
+
+ test("WriteAheadLogBasedBlockHandler - store block") {
+ createWriteAheadLogBasedBlockHandler()
+ val (data, blockIds) = generateAndStoreData(receivedBlockHandler)
+ receivedBlockHandler.asInstanceOf[WriteAheadLogBasedBlockHandler].stop()
+
+ // Check whether blocks inserted in the block manager are correct
+ val blockManagerData = blockIds.flatMap { blockId =>
+ blockManager.getLocal(blockId).map { _.data }.getOrElse(Iterator.empty)
+ }
+ blockManagerData.toList shouldEqual data.toList
+
+ // Check whether the blocks written to the write ahead log are correct
+ val logFiles = getLogFiles()
+ logFiles.size should be > 1
+
+ val logData = logFiles.flatMap {
+ file => new WriteAheadLogReader(file.toString, hadoopConf).toSeq
+ }.flatMap { blockManager.dataDeserialize(StreamBlockId(streamId, 0), _ )}
+ logData.toList shouldEqual data.toList
+ }
+
+ test("WriteAheadLogBasedBlockHandler - clear old blocks") {
+ createWriteAheadLogBasedBlockHandler()
+ generateAndStoreData(receivedBlockHandler)
+ val preCleanupLogFiles = getLogFiles()
+ preCleanupLogFiles.size should be > 1
+
+ // this depends on the number of blocks inserted using generateAndStoreData()
+ manualClock.currentTime() shouldEqual 5000L
+
+ val cleanupThreshTime = 3000L
+ receivedBlockHandler.clearOldBlocks(cleanupThreshTime)
+ eventually(timeout(10000 millis), interval(10 millis)) {
+ getLogFiles().size should be < preCleanupLogFiles.size
+ }
+ }
+
+ def createWriteAheadLogBasedBlockHandler() {
+ receivedBlockHandler = new WriteAheadLogBasedBlockHandler(blockManager, 1,
+ storageLevel, conf, hadoopConf, tempDirectory.toString, manualClock)
+ }
+
+ def generateAndStoreData(
+ receivedBlockHandler: ReceivedBlockHandler): (Seq[String], Seq[StreamBlockId]) = {
+ val data = (1 to 100).map { _.toString }
+ val blocks = data.grouped(10).map { _.toIterator }.toSeq
+ val blockIds = (0 until blocks.size).map { i => StreamBlockId(streamId, i) }
+ blocks.zip(blockIds).foreach { case (block, id) =>
+ manualClock.addToTime(500) // log rolling interval set to 1000 ms through SparkConf
+ receivedBlockHandler.storeBlock(id, IteratorBlock(block))
+ }
+ (data, blockIds)
+ }
+
+ def getLogFiles(): Seq[File] = {
+ getLogFilesInDirectory(
+ new File(checkpointDirToLogDir(tempDirectory.toString, streamId)))
+ }
+}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala
index b2114dfbc7b12..1ba6e0e45bc16 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.streaming.storage
-import java.io.{File, RandomAccessFile}
+import java.io.{DataInputStream, FileInputStream, File, RandomAccessFile}
import java.nio.ByteBuffer
import scala.util.Random
@@ -29,9 +29,10 @@ import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.commons.io.FileUtils
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.util.Utils
+import WriteAheadLogSuite._
class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
-
+
val hadoopConf = new Configuration()
var tempDirectory: File = null
@@ -52,7 +53,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
val writer = new WriteAheadLogWriter("file:///" + file, hadoopConf)
val segments = dataToWrite.map(data => writer.write(data))
writer.close()
- val writtenData = readDataFromLogManually(file, segments)
+ val writtenData = readDataManually(file, segments)
assert(writtenData.toArray === dataToWrite.toArray)
}
@@ -62,17 +63,19 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
val writer = new WriteAheadLogWriter("file:///" + file, hadoopConf)
dataToWrite.foreach { data =>
val segment = writer.write(data)
- assert(readDataFromLogManually(file, Seq(segment)).head === data)
+ assert(readDataManually(file, Seq(segment)).head === data)
}
writer.close()
}
test("WriteAheadLogReader - sequentially reading data") {
+ // Write data manually for testing the sequential reader
val file = File.createTempFile("TestSequentialReads", "", tempDirectory)
- val writtenData = writeRandomDataToLogManually(50, file)
+ val writtenData = generateRandomData()
+ writeDataManually(writtenData, file)
val reader = new WriteAheadLogReader("file:///" + file.toString, hadoopConf)
val readData = reader.toSeq.map(byteBufferToString)
- assert(readData.toList === writtenData.map { _._1 }.toList)
+ assert(readData.toList === writtenData.toList)
assert(reader.hasNext === false)
intercept[Exception] {
reader.next()
@@ -81,6 +84,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
}
test("WriteAheadLogReader - sequentially reading data written with writer") {
+ // Write data manually for testing the sequential reader
val file = new File(tempDirectory, "TestWriter")
val dataToWrite = generateRandomData()
writeDataUsingWriter(file, dataToWrite)
@@ -89,73 +93,130 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
reader.foreach { byteBuffer =>
assert(byteBufferToString(byteBuffer) === iter.next())
}
+ reader.close()
}
test("WriteAheadLogRandomReader - reading data using random reader") {
+ // Write data manually for testing the random reader
val file = File.createTempFile("TestRandomReads", "", tempDirectory)
- val writtenData = writeRandomDataToLogManually(50, file)
+ val writtenData = generateRandomData()
+ val segments = writeDataManually(writtenData, file)
+
+ // Get a random order of these segments and read them back
+ val writtenDataAndSegments = writtenData.zip(segments).toSeq.permutations.take(10).flatten
val reader = new WriteAheadLogRandomReader("file:///" + file.toString, hadoopConf)
- val reorderedWrittenData = writtenData.toSeq.permutations.next()
- reorderedWrittenData.foreach { case (data, offset, length) =>
- val segment = new FileSegment(file.toString, offset, length)
+ writtenDataAndSegments.foreach { case (data, segment) =>
assert(data === byteBufferToString(reader.read(segment)))
}
reader.close()
}
test("WriteAheadLogRandomReader - reading data using random reader written with writer") {
+ // Write data using writer for testing the random reader
val file = new File(tempDirectory, "TestRandomReads")
- val dataToWrite = generateRandomData()
- val segments = writeDataUsingWriter(file, dataToWrite)
+ val data = generateRandomData()
+ val segments = writeDataUsingWriter(file, data)
+
+ // Read a random sequence of segments and verify read data
+ val dataAndSegments = data.zip(segments).toSeq.permutations.take(10).flatten
val reader = new WriteAheadLogRandomReader("file:///" + file.toString, hadoopConf)
- val writtenData = segments.map { byteBuffer => byteBufferToString(reader.read(byteBuffer)) }
- assert(dataToWrite.toList === writtenData.toList)
+ dataAndSegments.foreach { case(data, segment) =>
+ assert(data === byteBufferToString(reader.read(segment)))
+ }
+ reader.close()
}
- test("WriteAheadLogManager - write rotating logs and reading from them") {
- val fakeClock = new ManualClock
- val manager = new WriteAheadLogManager(tempDirectory.toString, hadoopConf,
- rollingIntervalSecs = 1, clock = fakeClock)
+ test("WriteAheadLogManager - write rotating logs") {
+ // Write data using manager
val dataToWrite = generateRandomData(10)
- dataToWrite.foreach { data =>
- fakeClock.addToTime(500)
- manager.writeToLog(data)
- }
- println(tempDirectory.list().mkString("\n"))
- assert(tempDirectory.list().size > 1)
-
- val readData = manager.readFromLog().map(byteBufferToString).toSeq
- println("Generated data")
- printData(dataToWrite)
- println("Read data")
- printData(readData)
+ writeDataUsingManager(tempDirectory, dataToWrite)
+
+ // Read data manually to verify the written data
+ val logFiles = getLogFilesInDirectory(tempDirectory)
+ assert(logFiles.size > 1)
+ val writtenData = logFiles.flatMap { file => readDataManually(file) }
+ assert(writtenData.toList === dataToWrite.toList)
+ }
+
+ test("WriteAheadLogManager - read rotating logs") {
+ // Write data manually for testing reading through manager
+ val writtenData = (1 to 10).map { i =>
+ val data = generateRandomData(10)
+ val file = new File(tempDirectory, s"log-$i-${i + 1}")
+ writeDataManually(data, file)
+ println(s"Generated log file $file")
+ data
+ }.flatten
+
+ // Read data using manager and verify
+ val readData = readDataUsingManager(tempDirectory)
+ assert(readData.toList === writtenData.toList)
+ }
+
+ test("WriteAheadLogManager - recover past logs when creating new manager") {
+ // Write data with manager, recover with new manager and verify
+ val dataToWrite = generateRandomData(100)
+ writeDataUsingManager(tempDirectory, dataToWrite)
+ val logFiles = getLogFilesInDirectory(tempDirectory)
+ println("==========\n" + logFiles.mkString("\n") + "\n==========\n" )
+
+ assert(logFiles.size > 1)
+ val readData = readDataUsingManager(tempDirectory)
assert(dataToWrite.toList === readData.toList)
}
+ // TODO (Hari, TD): Test different failure conditions of writers and readers.
+ // - Failure in the middle of write
+ // - Failure while reading incomplete/corrupt file
+}
+
+object WriteAheadLogSuite {
+
+ private val hadoopConf = new Configuration()
+
/**
* Write data to the file and returns the an array of the bytes written.
* This is used to test the WAL reader independently of the WAL writer.
*/
- def writeRandomDataToLogManually(count: Int, file: File): ArrayBuffer[(String, Long, Int)] = {
- val writtenData = new ArrayBuffer[(String, Long, Int)]()
+ def writeDataManually(data: Seq[String], file: File): Seq[FileSegment] = {
+ val segments = new ArrayBuffer[FileSegment]()
val writer = new RandomAccessFile(file, "rw")
- for (i <- 1 to count) {
- val data = generateRandomData(1).head
+ data.foreach { item =>
val offset = writer.getFilePointer()
- val bytes = Utils.serialize(data)
+ val bytes = Utils.serialize(item)
writer.writeInt(bytes.size)
writer.write(bytes)
- writtenData += ((data, offset, bytes.size))
+ segments += FileSegment(file.toString, offset, bytes.size)
}
writer.close()
- writtenData
+ segments
+ }
+
+ def writeDataUsingWriter(file: File, data: Seq[String]): Seq[FileSegment] = {
+ val writer = new WriteAheadLogWriter(file.toString, hadoopConf)
+ val segments = data.map {
+ item => writer.write(item)
+ }
+ writer.close()
+ segments
+ }
+
+ def writeDataUsingManager(logDirectory: File, data: Seq[String]) {
+ val fakeClock = new ManualClock
+ val manager = new WriteAheadLogManager(logDirectory.toString, hadoopConf,
+ rollingIntervalSecs = 1, clock = fakeClock)
+ data.foreach { item =>
+ fakeClock.addToTime(500)
+ manager.writeToLog(item)
+ }
+ manager.stop()
}
/**
* Read data from the given segments of log file and returns the read list of byte buffers.
* This is used to test the WAL writer independently of the WAL reader.
*/
- def readDataFromLogManually(file: File, segments: Seq[FileSegment]): Seq[String] = {
+ def readDataManually(file: File, segments: Seq[FileSegment]): Seq[String] = {
val reader = new RandomAccessFile(file, "r")
println("File " + file + " has " + reader.length() + " bytes ")
segments.map { x =>
@@ -167,22 +228,44 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
}
}
+ def readDataManually(file: File): Seq[String] = {
+ val reader = new DataInputStream(new FileInputStream(file))
+ val buffer = new ArrayBuffer[String]
+ try {
+ while (reader.available > 0) {
+ val length = reader.readInt()
+ val bytes = new Array[Byte](length)
+ reader.read(bytes)
+ buffer += Utils.deserialize[String](bytes)
+ }
+ } finally {
+ reader.close()
+ }
+ buffer
+ }
+
+ def readDataUsingManager(logDirectory: File): Seq[String] = {
+ val manager = new WriteAheadLogManager(logDirectory.toString, hadoopConf)
+ val data = manager.readFromLog().map(byteBufferToString).toSeq
+ manager.stop()
+ data
+ }
+
def generateRandomData(numItems: Int = 50, itemSize: Int = 50): Seq[String] = {
(1 to numItems).map { _.toString }
}
+ def getLogFilesInDirectory(directory: File): Seq[File] = {
+ println("[ " + directory.listFiles().filter(_.getName().startsWith("log-")).mkString(" | ") + " ]")
+ directory.listFiles().filter(_.getName().startsWith("log-"))
+ .sortBy(_.getName.split("-")(1).toLong)
+ }
+
def printData(data: Seq[String]) {
println("# items in data = " + data.size)
println(data.mkString("\n"))
}
- def writeDataUsingWriter(file: File, input: Seq[String]): Seq[FileSegment] = {
- val writer = new WriteAheadLogWriter(file.toString, hadoopConf)
- val segments = input.map { data => writer.write(data) }
- writer.close()
- segments
- }
-
implicit def stringToByteBuffer(str: String): ByteBuffer = {
ByteBuffer.wrap(Utils.serialize(str))
}