From 5eafb2e2853f19558624d9e0a680fa6e9c0f88ab Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 6 Oct 2014 15:21:54 -0700 Subject: [PATCH] Added checkpoint data of ReceiverInputDStream, improved WALManager, refactored WALSuite. --- .../dstream/DStreamCheckpointData.scala | 2 +- .../dstream/ReceiverInputDStream.scala | 25 +- .../receiver/ReceiverSupervisorImpl.scala | 2 +- .../streaming/scheduler/ReceiverTracker.scala | 4 +- ...nager.scala => ReceivedBlockHandler.scala} | 30 ++- .../storage/WriteAheadLogManager.scala | 124 +++++++--- .../storage/WriteAheadLogReader.scala | 30 +-- .../storage/WriteAheadLogSuite.scala | 215 ++++++++++-------- 8 files changed, 288 insertions(+), 144 deletions(-) rename streaming/src/main/scala/org/apache/spark/streaming/storage/{ReceivedDataManager.scala => ReceivedBlockHandler.scala} (75%) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala index f33c0ceafdf42..544cf57587e57 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala @@ -26,7 +26,7 @@ import org.apache.spark.Logging import org.apache.spark.streaming.Time private[streaming] -class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) +class DStreamCheckpointData[T: ClassTag](dstream: DStream[T]) extends Serializable with Logging { protected val data = new HashMap[Time, AnyRef]() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index 391e40924f38a..e53af30836a0a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -40,7 +40,8 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont extends InputDStream[T](ssc_) { /** Keeps all received blocks information */ - private lazy val receivedBlockInfo = new HashMap[Time, Array[ReceivedBlockInfo]] + private[streaming] lazy val receivedBlockInfo = new HashMap[Time, Array[ReceivedBlockInfo]] + private[streaming] override val checkpointData = new ReceiverInputDStreamCheckpointData(this) /** This is an unique identifier for the network input stream. */ val id = ssc.getNewReceiverStreamId() @@ -92,3 +93,25 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont } } +private[streaming] class ReceiverInputDStreamCheckpointData[T: ClassTag]( + dstream: ReceiverInputDStream[T]) extends DStreamCheckpointData[T](dstream) { + + private var timeToReceivedBlockInfo: Seq[(Time, Array[ReceivedBlockInfo])] = _ + + override def update(time: Time) { + timeToReceivedBlockInfo = dstream.receivedBlockInfo.toSeq + } + + override def cleanup(time: Time) { + // not required, as block info copied in whole every time update is called + } + + override def restore() { + Option(timeToReceivedBlockInfo).foreach { blockInfo => + dstream.receivedBlockInfo.clear() + dstream.receivedBlockInfo ++= blockInfo + } + } +} + + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index ef2a22a363374..017cbae5060b5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -142,7 +142,7 @@ private[streaming] class ReceiverSupervisorImpl( } val time = System.currentTimeMillis - val fileSegmentOption = receivedBlockHandler.store(blockId, receivedBlock) match { + val fileSegmentOption = receivedBlockHandler.storeBlock(blockId, receivedBlock) match { case f: FileSegment => Some(f) case _ => None } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 081e1c4bd8141..36a55e64409b6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -62,7 +62,7 @@ class ReceivedBlockInfoCheckpointer( logDirectory: String, conf: SparkConf, hadoopConf: Configuration) { private val logManager = new WriteAheadLogManager( - logDirectory, conf, hadoopConf, "ReceiverTracker.WriteAheadLogManager") + logDirectory, hadoopConf, threadPoolName = "ReceiverTracker.WriteAheadLogManager") def read(): Iterator[ReceivedBlockInfo] = { logManager.readFromLog().map { byteBuffer => @@ -76,7 +76,7 @@ class ReceivedBlockInfoCheckpointer( } def clear(threshTime: Long) { - logManager.clear(threshTime) + logManager.clearOldLogs(threshTime) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/ReceivedDataManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/ReceivedBlockHandler.scala similarity index 75% rename from streaming/src/main/scala/org/apache/spark/streaming/storage/ReceivedDataManager.scala rename to streaming/src/main/scala/org/apache/spark/streaming/storage/ReceivedBlockHandler.scala index 0c4db076eb357..052bb59c9c324 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/ReceivedDataManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/ReceivedBlockHandler.scala @@ -19,8 +19,8 @@ private[streaming] case class IteratorBlock(iterator: Iterator[_]) extends Recei private[streaming] case class ByteBufferBlock(byteBuffer: ByteBuffer) extends ReceivedBlock private[streaming] trait ReceivedBlockHandler { - def store(blockId: StreamBlockId, receivedBlock: ReceivedBlock): Option[AnyRef] - def clear(threshTime: Long) { } + def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): Option[AnyRef] + def clearOldBlocks(threshTime: Long) } private[streaming] class BlockManagerBasedBlockHandler( @@ -29,7 +29,7 @@ private[streaming] class BlockManagerBasedBlockHandler( storageLevel: StorageLevel ) extends ReceivedBlockHandler { - def store(blockId: StreamBlockId, receivedBlock: ReceivedBlock): Option[AnyRef] = { + def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): Option[AnyRef] = { receivedBlock match { case ArrayBufferBlock(arrayBuffer) => blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, tellMaster = true) @@ -42,6 +42,11 @@ private[streaming] class BlockManagerBasedBlockHandler( } None } + + def clearOldBlocks(threshTime: Long) { + // this is not used as blocks inserted into the BlockManager are cleared by DStream's clearing + // of BlockRDDs. + } } private[streaming] class WriteAheadLogBasedBlockHandler( @@ -53,18 +58,23 @@ private[streaming] class WriteAheadLogBasedBlockHandler( checkpointDir: String ) extends ReceivedBlockHandler with Logging { + private val blockStoreTimeout = conf.getInt( + "spark.streaming.receiver.blockStoreTimeout", 30).seconds + private val rotationInterval = conf.getInt( + "spark.streaming.receiver.writeAheadLog.rotationInterval", 60) + private val maxFailures = conf.getInt( + "spark.streaming.receiver.writeAheadLog.maxFailures", 3) + private val logManager = new WriteAheadLogManager( new Path(checkpointDir, new Path("receivedData", streamId.toString)).toString, - conf, hadoopConf, "WriteAheadLogBasedHandler.WriteAheadLogManager" + hadoopConf, rotationInterval, maxFailures, + "WriteAheadLogBasedHandler.WriteAheadLogManager" ) - private val blockStoreTimeout = - conf.getInt("spark.streaming.receiver.blockStoreTimeout", 30) seconds - implicit private val executionContext = ExecutionContext.fromExecutorService( Utils.newDaemonFixedThreadPool(1, "WriteAheadLogBasedBlockHandler")) - def store(blockId: StreamBlockId, receivedBlock: ReceivedBlock): Option[AnyRef] = { + def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): Option[AnyRef] = { val serializedBlock = receivedBlock match { case ArrayBufferBlock(arrayBuffer) => blockManager.dataSerialize(blockId, arrayBuffer.iterator) @@ -87,4 +97,8 @@ private[streaming] class WriteAheadLogBasedBlockHandler( Some(Await.result(combinedFuture, blockStoreTimeout)) } + + def clearOldBlocks(threshTime: Long) { + logManager.clearOldLogs(threshTime) + } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala index 8a2eda1962946..62086a3194cad 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala @@ -7,18 +7,20 @@ import scala.concurrent.{ExecutionContext, Future} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.spark.{Logging, SparkConf} +import org.apache.spark.Logging +import org.apache.spark.streaming.storage.WriteAheadLogManager._ +import org.apache.spark.streaming.util.{Clock, ManualClock, SystemClock} import org.apache.spark.util.Utils -private[streaming] class WriteAheadLogManager(logDirectory: String, conf: SparkConf, - hadoopConf: Configuration, threadPoolName: String = "WriteAheadLogManager") extends Logging { +private[streaming] class WriteAheadLogManager( + logDirectory: String, + hadoopConf: Configuration, + rollingIntervalSecs: Int = 60, + maxFailures: Int = 3, + threadPoolName: String = "WriteAheadLogManager", + clock: Clock = new SystemClock + ) extends Logging { - private case class LogInfo(startTime: Long, endTime: Long, path: String) - - private val logWriterChangeIntervalSeconds = - conf.getInt("spark.streaming.wal.interval", 60) - private val logWriterMaxFailures = - conf.getInt("spark.streaming.wal.maxRetries", 3) private val pastLogs = new ArrayBuffer[LogInfo] implicit private val executionContext = ExecutionContext.fromExecutorService(Utils.newDaemonFixedThreadPool(1, threadPoolName)) @@ -26,21 +28,24 @@ private[streaming] class WriteAheadLogManager(logDirectory: String, conf: SparkC private var currentLogPath: String = null private var currentLogWriter: WriteAheadLogWriter = null private var currentLogWriterStartTime: Long = -1L + private var currentLogWriterStopTime: Long = -1L + + recoverPastLogs() def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized { + var fileSegment: FileSegment = null var failures = 0 var lastException: Exception = null - var fileSegment: FileSegment = null var succeeded = false - while (!succeeded && failures < logWriterMaxFailures) { + while (!succeeded && failures < maxFailures) { try { - fileSegment = logWriter.write(byteBuffer) + fileSegment = getLogWriter.write(byteBuffer) succeeded = true } catch { case ex: Exception => lastException = ex logWarning("Failed to ...") - reset() + resetWriter() failures += 1 } } @@ -51,10 +56,10 @@ private[streaming] class WriteAheadLogManager(logDirectory: String, conf: SparkC } def readFromLog(): Iterator[ByteBuffer] = { - WriteAheadLogManager.logsToIterator(pastLogs sortBy { _.startTime} map { _.path}, hadoopConf) + logsToIterator(pastLogs.map{ _.path}, hadoopConf) } - def clear(threshTime: Long): Unit = { + def clearOldLogs(threshTime: Long): Unit = { // Get the log files that are older than the threshold time, while accounting for possible // time skew between the node issues the threshTime (say, driver node), and the local time at // the node this is being executed (say, worker node) @@ -72,7 +77,8 @@ private[streaming] class WriteAheadLogManager(logDirectory: String, conf: SparkC synchronized { pastLogs -= logInfo } logDebug(s"Cleared log file $logInfo") } catch { - case ex: Exception => logWarning("Could not delete ...") + case ex: Exception => + logWarning(s"Error clearing log file $logInfo", ex) } } logInfo(s"Cleared log files in $logDirectory older than $threshTime") @@ -81,31 +87,97 @@ private[streaming] class WriteAheadLogManager(logDirectory: String, conf: SparkC Future { deleteFiles() } } - private def logWriter: WriteAheadLogWriter = synchronized { - val currentTime = System.currentTimeMillis - if (currentLogWriter == null || - currentTime - currentLogWriterStartTime > logWriterChangeIntervalSeconds * 1000) { - pastLogs += LogInfo(currentLogWriterStartTime, currentTime, currentLogPath) - val newLogPath = new Path(logDirectory, s"data-$currentTime".toString) + private def getLogWriter: WriteAheadLogWriter = synchronized { + val currentTime = clock.currentTime() + if (currentLogPath == null || currentTime > currentLogWriterStopTime) { + resetWriter() + if (currentLogPath != null) { + pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, currentLogPath) + } + currentLogWriterStartTime = currentTime + currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000) + val newLogPath = new Path(logDirectory, + timeToLogFile(currentLogWriterStartTime, currentLogWriterStopTime)) currentLogPath = newLogPath.toString currentLogWriter = new WriteAheadLogWriter(currentLogPath, hadoopConf) - currentLogWriterStartTime = currentTime } currentLogWriter } - private def reset(): Unit = synchronized { - currentLogWriter.close() - currentLogWriter = null + private def recoverPastLogs(): Unit = synchronized { + val logDirectoryPath = new Path(logDirectory) + val fs = logDirectoryPath.getFileSystem(hadoopConf) + if (fs.exists(logDirectoryPath) && fs.getFileStatus(logDirectoryPath).isDir) { + val logFiles = fs.listStatus(logDirectoryPath).map { _.getPath } + pastLogs.clear() + pastLogs ++= logFilesTologInfo(logFiles) + logInfo(s"Recovered ${logFiles.size} log files from $logDirectory") + logDebug(s"Recoved files are:\n${logFiles.mkString("\n")}") + } + } + + private def resetWriter(): Unit = synchronized { + if (currentLogWriter != null) { + currentLogWriter.close() + currentLogWriter = null + } } + +/* + private def tryMultipleTimes[T](message: String)(body: => T): T = { + var result: T = null.asInstanceOf[T] + var failures = 0 + var lastException: Exception = null + var succeeded = false + while (!succeeded && failures < maxFailures) { + try { + result = body + succeeded = true + } catch { + case ex: Exception => + lastException = ex + resetWriter() + failures += 1 + logWarning(message, ex) + } + } + if (!succeeded) { + throw new Exception(s"Failed $message after $failures failures", lastException) + } + result + } */ } private[storage] object WriteAheadLogManager { + + case class LogInfo(startTime: Long, endTime: Long, path: String) + + val logFileRegex = """log-(\d+)-(\d+)""".r + + def timeToLogFile(startTime: Long, stopTime: Long): String = { + s"log-$startTime-$stopTime" + } + + def logFilesTologInfo(files: Seq[Path]): Seq[LogInfo] = { + println("Creating log info with " + files.mkString("[","],[","]")) + files.flatMap { file => + logFileRegex.findFirstIn(file.getName()) match { + case logFileRegex(startTimeStr, stopTimeStr) => + val startTime = startTimeStr.toLong + val stopTime = stopTimeStr.toLong + Some(LogInfo(startTime, stopTime, file.toString)) + case _ => None + } + }.sortBy { _.startTime } + } + def logsToIterator( chronologicallySortedLogFiles: Seq[String], hadoopConf: Configuration ): Iterator[ByteBuffer] = { + println("Creating iterator with " + chronologicallySortedLogFiles.mkString("[", "],[", "]")) chronologicallySortedLogFiles.iterator.map { file => + println(s"Creating log reader with $file") new WriteAheadLogReader(file, hadoopConf) } flatMap { x => x } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala index 724549e216e93..35d3e7fb2b873 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala @@ -29,7 +29,10 @@ private[streaming] class WriteAheadLogReader(path: String, conf: Configuration) private var nextItem: Option[ByteBuffer] = None override def hasNext: Boolean = synchronized { - assertOpen() + if (closed) { + return false + } + if (nextItem.isDefined) { // handle the case where hasNext is called without calling next true } else { @@ -40,31 +43,30 @@ private[streaming] class WriteAheadLogReader(path: String, conf: Configuration) nextItem = Some(ByteBuffer.wrap(buffer)) true } catch { - case e: EOFException => false - case e: Exception => throw e + case e: EOFException => + close() + false + case e: Exception => + close() + throw e } } } override def next(): ByteBuffer = synchronized { - // TODO: Possible error case where there are not enough bytes in the stream - // TODO: How to handle that? val data = nextItem.getOrElse { - throw new IllegalStateException("next called without calling hasNext or after hasNext " + - "returned false") + close() + throw new IllegalStateException( + "next called without calling hasNext or after hasNext returned false") } nextItem = None // Ensure the next hasNext call loads new data. data } override def close(): Unit = synchronized { + if (!closed) { + instream.close() + } closed = true - instream.close() } - - private def assertOpen() { - HdfsUtils.checkState(!closed, "Stream is closed. Create a new Reader to read from the " + - "file.") - } - } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala index ed21bdbb399fd..b2114dfbc7b12 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala @@ -16,145 +16,178 @@ */ package org.apache.spark.streaming.storage -import java.io.{RandomAccessFile, File} +import java.io.{File, RandomAccessFile} import java.nio.ByteBuffer -import java.util.Random +import scala.util.Random import scala.collection.mutable.ArrayBuffer +import scala.language.implicitConversions import com.google.common.io.Files import org.apache.hadoop.conf.Configuration +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.apache.commons.io.FileUtils +import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.util.Utils -import org.apache.spark.streaming.TestSuiteBase - -class WriteAheadLogSuite extends TestSuiteBase { +class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { val hadoopConf = new Configuration() - val random = new Random() - - test("Test successful writes") { - val dir = Files.createTempDir() - val file = new File(dir, "TestWriter") - try { - val dataToWrite = for (i <- 1 to 50) yield generateRandomData() - val writer = new WriteAheadLogWriter("file:///" + file.toString, hadoopConf) - val segments = dataToWrite.map(writer.write) - writer.close() - val writtenData = readData(segments, file) - assert(writtenData.toArray === dataToWrite.toArray) - } finally { - file.delete() - dir.delete() + var tempDirectory: File = null + + before { + tempDirectory = Files.createTempDir() + } + + after { + if (tempDirectory != null && tempDirectory.exists()) { + FileUtils.deleteDirectory(tempDirectory) + tempDirectory = null } } - test("Test successful reads using random reader") { - val file = File.createTempFile("TestRandomReads", "") - file.deleteOnExit() - val writtenData = writeData(50, file) - val reader = new WriteAheadLogRandomReader("file:///" + file.toString, hadoopConf) - writtenData.foreach { - x => - val length = x._1.remaining() - assert(x._1 === reader.read(new FileSegment(file.toString, x._2, length))) + test("WriteAheadLogWriter - writing data") { + val file = new File(tempDirectory, Random.nextString(10)) + val dataToWrite = generateRandomData() + val writer = new WriteAheadLogWriter("file:///" + file, hadoopConf) + val segments = dataToWrite.map(data => writer.write(data)) + writer.close() + val writtenData = readDataFromLogManually(file, segments) + assert(writtenData.toArray === dataToWrite.toArray) + } + + test("WriteAheadLogWriter - syncing of data by writing and reading immediately") { + val file = new File(tempDirectory, Random.nextString(10)) + val dataToWrite = generateRandomData() + val writer = new WriteAheadLogWriter("file:///" + file, hadoopConf) + dataToWrite.foreach { data => + val segment = writer.write(data) + assert(readDataFromLogManually(file, Seq(segment)).head === data) } - reader.close() + writer.close() } - test("Test reading data using random reader written with writer") { - val dir = Files.createTempDir() - val file = new File(dir, "TestRandomReads") - try { - val dataToWrite = for (i <- 1 to 50) yield generateRandomData() - val segments = writeUsingWriter(file, dataToWrite) - val iter = dataToWrite.iterator - val reader = new WriteAheadLogRandomReader("file:///" + file.toString, hadoopConf) - val writtenData = segments.map { x => - reader.read(x) - } - assert(dataToWrite.toArray === writtenData.toArray) - } finally { - file.delete() - dir.delete() + test("WriteAheadLogReader - sequentially reading data") { + val file = File.createTempFile("TestSequentialReads", "", tempDirectory) + val writtenData = writeRandomDataToLogManually(50, file) + val reader = new WriteAheadLogReader("file:///" + file.toString, hadoopConf) + val readData = reader.toSeq.map(byteBufferToString) + assert(readData.toList === writtenData.map { _._1 }.toList) + assert(reader.hasNext === false) + intercept[Exception] { + reader.next() } + reader.close() } - test("Test successful reads using sequential reader") { - val file = File.createTempFile("TestSequentialReads", "") - file.deleteOnExit() - val writtenData = writeData(50, file) + test("WriteAheadLogReader - sequentially reading data written with writer") { + val file = new File(tempDirectory, "TestWriter") + val dataToWrite = generateRandomData() + writeDataUsingWriter(file, dataToWrite) + val iter = dataToWrite.iterator val reader = new WriteAheadLogReader("file:///" + file.toString, hadoopConf) - val iter = writtenData.iterator - iter.foreach { x => - assert(reader.hasNext === true) - assert(reader.next() === x._1) + reader.foreach { byteBuffer => + assert(byteBufferToString(byteBuffer) === iter.next()) + } + } + + test("WriteAheadLogRandomReader - reading data using random reader") { + val file = File.createTempFile("TestRandomReads", "", tempDirectory) + val writtenData = writeRandomDataToLogManually(50, file) + val reader = new WriteAheadLogRandomReader("file:///" + file.toString, hadoopConf) + val reorderedWrittenData = writtenData.toSeq.permutations.next() + reorderedWrittenData.foreach { case (data, offset, length) => + val segment = new FileSegment(file.toString, offset, length) + assert(data === byteBufferToString(reader.read(segment))) } reader.close() } + test("WriteAheadLogRandomReader - reading data using random reader written with writer") { + val file = new File(tempDirectory, "TestRandomReads") + val dataToWrite = generateRandomData() + val segments = writeDataUsingWriter(file, dataToWrite) + val reader = new WriteAheadLogRandomReader("file:///" + file.toString, hadoopConf) + val writtenData = segments.map { byteBuffer => byteBufferToString(reader.read(byteBuffer)) } + assert(dataToWrite.toList === writtenData.toList) + } - test("Test reading data using sequential reader written with writer") { - val dir = Files.createTempDir() - val file = new File(dir, "TestWriter") - try { - val dataToWrite = for (i <- 1 to 50) yield generateRandomData() - val segments = writeUsingWriter(file, dataToWrite) - val iter = dataToWrite.iterator - val reader = new WriteAheadLogReader("file:///" + file.toString, hadoopConf) - reader.foreach { x => - assert(x === iter.next()) - } - } finally { - file.delete() - dir.delete() + test("WriteAheadLogManager - write rotating logs and reading from them") { + val fakeClock = new ManualClock + val manager = new WriteAheadLogManager(tempDirectory.toString, hadoopConf, + rollingIntervalSecs = 1, clock = fakeClock) + val dataToWrite = generateRandomData(10) + dataToWrite.foreach { data => + fakeClock.addToTime(500) + manager.writeToLog(data) } + println(tempDirectory.list().mkString("\n")) + assert(tempDirectory.list().size > 1) + + val readData = manager.readFromLog().map(byteBufferToString).toSeq + println("Generated data") + printData(dataToWrite) + println("Read data") + printData(readData) + assert(dataToWrite.toList === readData.toList) } /** - * Writes data to the file and returns the an array of the bytes written. - * @param count - * @return + * Write data to the file and returns the an array of the bytes written. + * This is used to test the WAL reader independently of the WAL writer. */ - // We don't want to be using the WAL writer to test the reader - it will be painful to figure - // out where the bug is. Instead generate the file by hand and see if the WAL reader can - // handle it. - def writeData(count: Int, file: File): ArrayBuffer[(ByteBuffer, Long)] = { - val writtenData = new ArrayBuffer[(ByteBuffer, Long)]() + def writeRandomDataToLogManually(count: Int, file: File): ArrayBuffer[(String, Long, Int)] = { + val writtenData = new ArrayBuffer[(String, Long, Int)]() val writer = new RandomAccessFile(file, "rw") - var i = 0 - while (i < count) { - val data = generateRandomData() - writtenData += ((data, writer.getFilePointer)) - data.rewind() - writer.writeInt(data.remaining()) - writer.write(data.array()) - i += 1 + for (i <- 1 to count) { + val data = generateRandomData(1).head + val offset = writer.getFilePointer() + val bytes = Utils.serialize(data) + writer.writeInt(bytes.size) + writer.write(bytes) + writtenData += ((data, offset, bytes.size)) } writer.close() writtenData } - def readData(segments: Seq[FileSegment], file: File): Seq[ByteBuffer] = { + /** + * Read data from the given segments of log file and returns the read list of byte buffers. + * This is used to test the WAL writer independently of the WAL reader. + */ + def readDataFromLogManually(file: File, segments: Seq[FileSegment]): Seq[String] = { val reader = new RandomAccessFile(file, "r") + println("File " + file + " has " + reader.length() + " bytes ") segments.map { x => reader.seek(x.offset) val data = new Array[Byte](x.length) reader.readInt() reader.readFully(data) - ByteBuffer.wrap(data) + Utils.deserialize[String](data) } } - def generateRandomData(): ByteBuffer = { - val data = new Array[Byte](random.nextInt(50)) - random.nextBytes(data) - ByteBuffer.wrap(data) + def generateRandomData(numItems: Int = 50, itemSize: Int = 50): Seq[String] = { + (1 to numItems).map { _.toString } } - def writeUsingWriter(file: File, input: Seq[ByteBuffer]): Seq[FileSegment] = { + def printData(data: Seq[String]) { + println("# items in data = " + data.size) + println(data.mkString("\n")) + } + + def writeDataUsingWriter(file: File, input: Seq[String]): Seq[FileSegment] = { val writer = new WriteAheadLogWriter(file.toString, hadoopConf) - val segments = input.map(writer.write) + val segments = input.map { data => writer.write(data) } writer.close() segments } + + implicit def stringToByteBuffer(str: String): ByteBuffer = { + ByteBuffer.wrap(Utils.serialize(str)) + } + + implicit def byteBufferToString(byteBuffer: ByteBuffer): String = { + Utils.deserialize[String](byteBuffer.array) + } }