From b86af2dbf1a7d7e3f3803314d9d42a56c4c06cce Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 7 Oct 2014 11:11:20 -0700 Subject: [PATCH] Improved WALSuite, and added ReceivedBlockHandlerSuite. --- streaming/pom.xml | 5 + .../storage/ReceivedBlockHandler.scala | 26 ++- .../storage/WriteAheadLogManager.scala | 48 ++--- .../storage/WriteAheadLogWriter.scala | 2 + .../storage/ReceivedBlockHandlerSuite.scala | 138 ++++++++++++++ .../storage/WriteAheadLogSuite.scala | 173 +++++++++++++----- 6 files changed, 320 insertions(+), 72 deletions(-) create mode 100644 streaming/src/test/scala/org/apache/spark/streaming/storage/ReceivedBlockHandlerSuite.scala diff --git a/streaming/pom.xml b/streaming/pom.xml index 12f900c91eb98..002c35f251390 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -68,6 +68,11 @@ junit-interface test + + org.mockito + mockito-all + test + target/scala-${scala.binary.version}/classes diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/ReceivedBlockHandler.scala index 052bb59c9c324..299d6069d7018 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/ReceivedBlockHandler.scala @@ -7,10 +7,12 @@ import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration._ import scala.language.postfixOps +import WriteAheadLogBasedBlockHandler._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.{Logging, SparkConf} import org.apache.spark.storage.{BlockManager, StorageLevel, StreamBlockId} +import org.apache.spark.streaming.util.{Clock, SystemClock} import org.apache.spark.util.Utils private[streaming] sealed trait ReceivedBlock @@ -55,20 +57,22 @@ private[streaming] class WriteAheadLogBasedBlockHandler( storageLevel: StorageLevel, conf: SparkConf, hadoopConf: Configuration, - checkpointDir: String + checkpointDir: String, + clock: Clock = new SystemClock ) extends ReceivedBlockHandler with Logging { private val blockStoreTimeout = conf.getInt( "spark.streaming.receiver.blockStoreTimeout", 30).seconds - private val rotationInterval = conf.getInt( - "spark.streaming.receiver.writeAheadLog.rotationInterval", 60) + private val rollingInterval = conf.getInt( + "spark.streaming.receiver.writeAheadLog.rollingInterval", 60) private val maxFailures = conf.getInt( "spark.streaming.receiver.writeAheadLog.maxFailures", 3) private val logManager = new WriteAheadLogManager( - new Path(checkpointDir, new Path("receivedData", streamId.toString)).toString, - hadoopConf, rotationInterval, maxFailures, - "WriteAheadLogBasedHandler.WriteAheadLogManager" + checkpointDirToLogDir(checkpointDir, streamId), + hadoopConf, rollingInterval, maxFailures, + threadPoolName = "WriteAheadLogBasedHandler.WriteAheadLogManager", + clock = clock ) implicit private val executionContext = ExecutionContext.fromExecutorService( @@ -101,4 +105,14 @@ private[streaming] class WriteAheadLogBasedBlockHandler( def clearOldBlocks(threshTime: Long) { logManager.clearOldLogs(threshTime) } + + def stop() { + logManager.stop() + } +} + +private[streaming] object WriteAheadLogBasedBlockHandler { + def checkpointDirToLogDir(checkpointDir: String, streamId: Int): String = { + new Path(checkpointDir, new Path("receivedData", streamId.toString)).toString + } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala index 62086a3194cad..ff878e7b8bc31 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala @@ -9,7 +9,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.Logging import org.apache.spark.streaming.storage.WriteAheadLogManager._ -import org.apache.spark.streaming.util.{Clock, ManualClock, SystemClock} +import org.apache.spark.streaming.util.{Clock, SystemClock} import org.apache.spark.util.Utils private[streaming] class WriteAheadLogManager( @@ -55,17 +55,26 @@ private[streaming] class WriteAheadLogManager( fileSegment } - def readFromLog(): Iterator[ByteBuffer] = { - logsToIterator(pastLogs.map{ _.path}, hadoopConf) + def readFromLog(): Iterator[ByteBuffer] = synchronized { + val logFilesToRead = pastLogs.map{ _.path} ++ Option(currentLogPath) + println("Reading from the logs: " + logFilesToRead.mkString("\n")) + logFilesToRead.iterator.map { file => + println(s"Creating log reader with $file") + new WriteAheadLogReader(file, hadoopConf) + } flatMap { x => x } } + /** + * Delete the log files that are older than the threshold time. + * + * Its important to note that the threshold time is based on the time stamps used in the log + * files, and is therefore based on the local system time. So if there is coordination necessary + * between the node calculating the threshTime (say, driver node), and the local system time + * (say, worker node), the caller has to take account of possible time skew. + */ def clearOldLogs(threshTime: Long): Unit = { - // Get the log files that are older than the threshold time, while accounting for possible - // time skew between the node issues the threshTime (say, driver node), and the local time at - // the node this is being executed (say, worker node) - val maxTimeSkewMs = 60 * 1000 // 1 minute - val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime - maxTimeSkewMs } } - logDebug(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " + + val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } } + println(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " + s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}") def deleteFiles() { @@ -87,6 +96,13 @@ private[streaming] class WriteAheadLogManager( Future { deleteFiles() } } + def stop(): Unit = synchronized { + println("Stopping log manager") + if (currentLogWriter != null) { + currentLogWriter.close() + } + } + private def getLogWriter: WriteAheadLogWriter = synchronized { val currentTime = clock.currentTime() if (currentLogPath == null || currentTime > currentLogWriterStopTime) { @@ -112,12 +128,13 @@ private[streaming] class WriteAheadLogManager( pastLogs.clear() pastLogs ++= logFilesTologInfo(logFiles) logInfo(s"Recovered ${logFiles.size} log files from $logDirectory") - logDebug(s"Recoved files are:\n${logFiles.mkString("\n")}") + logDebug(s"Recovered files are:\n${logFiles.mkString("\n")}") } } private def resetWriter(): Unit = synchronized { if (currentLogWriter != null) { + currentLogWriter currentLogWriter.close() currentLogWriter = null } @@ -170,15 +187,4 @@ private[storage] object WriteAheadLogManager { } }.sortBy { _.startTime } } - - def logsToIterator( - chronologicallySortedLogFiles: Seq[String], - hadoopConf: Configuration - ): Iterator[ByteBuffer] = { - println("Creating iterator with " + chronologicallySortedLogFiles.mkString("[", "],[", "]")) - chronologicallySortedLogFiles.iterator.map { file => - println(s"Creating log reader with $file") - new WriteAheadLogReader(file, hadoopConf) - } flatMap { x => x } - } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala index 8a2db8305a7e2..f8f6089318344 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala @@ -55,10 +55,12 @@ private[streaming] class WriteAheadLogWriter(path: String, conf: Configuration) override private[streaming] def close(): Unit = synchronized { closed = true + hflushOrSync() stream.close() } private def hflushOrSync() { + stream.getWrappedStream.flush() hflushMethod.foreach(_.invoke(stream)) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/storage/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/storage/ReceivedBlockHandlerSuite.scala new file mode 100644 index 0000000000000..9c493210d2575 --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/storage/ReceivedBlockHandlerSuite.scala @@ -0,0 +1,138 @@ +package org.apache.spark.streaming.storage + +import java.io.File + +import scala.concurrent.duration._ +import scala.language.postfixOps + +import akka.actor.{ActorSystem, Props} +import com.google.common.io.Files +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} +import org.scalatest.concurrent.Eventually._ + +import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf} +import org.apache.spark.network.nio.NioBlockTransferService +import org.apache.spark.scheduler.LiveListenerBus +import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.shuffle.hash.HashShuffleManager +import org.apache.spark.storage._ +import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.util.AkkaUtils +import WriteAheadLogBasedBlockHandler._ +import WriteAheadLogSuite._ + +class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matchers { + + val conf = new SparkConf() + .set("spark.authenticate", "false") + .set("spark.kryoserializer.buffer.mb", "1") + .set("spark.streaming.receiver.writeAheadLog.rollingInterval", "1") + val hadoopConf = new Configuration() + val storageLevel = StorageLevel.MEMORY_ONLY_SER + val streamId = 1 + val securityMgr = new SecurityManager(conf) + val mapOutputTracker = new MapOutputTrackerMaster(conf) + val shuffleManager = new HashShuffleManager(conf) + val serializer = new KryoSerializer(conf) + val manualClock = new ManualClock + + var actorSystem: ActorSystem = null + var blockManagerMaster: BlockManagerMaster = null + var blockManager: BlockManager = null + var receivedBlockHandler: ReceivedBlockHandler = null + var tempDirectory: File = null + + before { + val (actorSystem, boundPort) = AkkaUtils.createActorSystem( + "test", "localhost", 0, conf = conf, securityManager = securityMgr) + this.actorSystem = actorSystem + + conf.set("spark.driver.port", boundPort.toString) + + blockManagerMaster = new BlockManagerMaster( + actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))), + conf, true) + blockManager = new BlockManager("bm", actorSystem, blockManagerMaster, serializer, 100000, + conf, mapOutputTracker, shuffleManager, new NioBlockTransferService(conf, securityMgr)) + tempDirectory = Files.createTempDir() + manualClock.setTime(0) + } + + after { + actorSystem.shutdown() + actorSystem.awaitTermination() + actorSystem = null + blockManagerMaster = null + + if (blockManager != null) { + blockManager.stop() + blockManager = null + } + + if (tempDirectory != null && tempDirectory.exists()) { + FileUtils.deleteDirectory(tempDirectory) + tempDirectory = null + } + } + + test("WriteAheadLogBasedBlockHandler - store block") { + createWriteAheadLogBasedBlockHandler() + val (data, blockIds) = generateAndStoreData(receivedBlockHandler) + receivedBlockHandler.asInstanceOf[WriteAheadLogBasedBlockHandler].stop() + + // Check whether blocks inserted in the block manager are correct + val blockManagerData = blockIds.flatMap { blockId => + blockManager.getLocal(blockId).map { _.data }.getOrElse(Iterator.empty) + } + blockManagerData.toList shouldEqual data.toList + + // Check whether the blocks written to the write ahead log are correct + val logFiles = getLogFiles() + logFiles.size should be > 1 + + val logData = logFiles.flatMap { + file => new WriteAheadLogReader(file.toString, hadoopConf).toSeq + }.flatMap { blockManager.dataDeserialize(StreamBlockId(streamId, 0), _ )} + logData.toList shouldEqual data.toList + } + + test("WriteAheadLogBasedBlockHandler - clear old blocks") { + createWriteAheadLogBasedBlockHandler() + generateAndStoreData(receivedBlockHandler) + val preCleanupLogFiles = getLogFiles() + preCleanupLogFiles.size should be > 1 + + // this depends on the number of blocks inserted using generateAndStoreData() + manualClock.currentTime() shouldEqual 5000L + + val cleanupThreshTime = 3000L + receivedBlockHandler.clearOldBlocks(cleanupThreshTime) + eventually(timeout(10000 millis), interval(10 millis)) { + getLogFiles().size should be < preCleanupLogFiles.size + } + } + + def createWriteAheadLogBasedBlockHandler() { + receivedBlockHandler = new WriteAheadLogBasedBlockHandler(blockManager, 1, + storageLevel, conf, hadoopConf, tempDirectory.toString, manualClock) + } + + def generateAndStoreData( + receivedBlockHandler: ReceivedBlockHandler): (Seq[String], Seq[StreamBlockId]) = { + val data = (1 to 100).map { _.toString } + val blocks = data.grouped(10).map { _.toIterator }.toSeq + val blockIds = (0 until blocks.size).map { i => StreamBlockId(streamId, i) } + blocks.zip(blockIds).foreach { case (block, id) => + manualClock.addToTime(500) // log rolling interval set to 1000 ms through SparkConf + receivedBlockHandler.storeBlock(id, IteratorBlock(block)) + } + (data, blockIds) + } + + def getLogFiles(): Seq[File] = { + getLogFilesInDirectory( + new File(checkpointDirToLogDir(tempDirectory.toString, streamId))) + } +} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala index b2114dfbc7b12..1ba6e0e45bc16 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.streaming.storage -import java.io.{File, RandomAccessFile} +import java.io.{DataInputStream, FileInputStream, File, RandomAccessFile} import java.nio.ByteBuffer import scala.util.Random @@ -29,9 +29,10 @@ import org.scalatest.{BeforeAndAfter, FunSuite} import org.apache.commons.io.FileUtils import org.apache.spark.streaming.util.ManualClock import org.apache.spark.util.Utils +import WriteAheadLogSuite._ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { - + val hadoopConf = new Configuration() var tempDirectory: File = null @@ -52,7 +53,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { val writer = new WriteAheadLogWriter("file:///" + file, hadoopConf) val segments = dataToWrite.map(data => writer.write(data)) writer.close() - val writtenData = readDataFromLogManually(file, segments) + val writtenData = readDataManually(file, segments) assert(writtenData.toArray === dataToWrite.toArray) } @@ -62,17 +63,19 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { val writer = new WriteAheadLogWriter("file:///" + file, hadoopConf) dataToWrite.foreach { data => val segment = writer.write(data) - assert(readDataFromLogManually(file, Seq(segment)).head === data) + assert(readDataManually(file, Seq(segment)).head === data) } writer.close() } test("WriteAheadLogReader - sequentially reading data") { + // Write data manually for testing the sequential reader val file = File.createTempFile("TestSequentialReads", "", tempDirectory) - val writtenData = writeRandomDataToLogManually(50, file) + val writtenData = generateRandomData() + writeDataManually(writtenData, file) val reader = new WriteAheadLogReader("file:///" + file.toString, hadoopConf) val readData = reader.toSeq.map(byteBufferToString) - assert(readData.toList === writtenData.map { _._1 }.toList) + assert(readData.toList === writtenData.toList) assert(reader.hasNext === false) intercept[Exception] { reader.next() @@ -81,6 +84,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { } test("WriteAheadLogReader - sequentially reading data written with writer") { + // Write data manually for testing the sequential reader val file = new File(tempDirectory, "TestWriter") val dataToWrite = generateRandomData() writeDataUsingWriter(file, dataToWrite) @@ -89,73 +93,130 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { reader.foreach { byteBuffer => assert(byteBufferToString(byteBuffer) === iter.next()) } + reader.close() } test("WriteAheadLogRandomReader - reading data using random reader") { + // Write data manually for testing the random reader val file = File.createTempFile("TestRandomReads", "", tempDirectory) - val writtenData = writeRandomDataToLogManually(50, file) + val writtenData = generateRandomData() + val segments = writeDataManually(writtenData, file) + + // Get a random order of these segments and read them back + val writtenDataAndSegments = writtenData.zip(segments).toSeq.permutations.take(10).flatten val reader = new WriteAheadLogRandomReader("file:///" + file.toString, hadoopConf) - val reorderedWrittenData = writtenData.toSeq.permutations.next() - reorderedWrittenData.foreach { case (data, offset, length) => - val segment = new FileSegment(file.toString, offset, length) + writtenDataAndSegments.foreach { case (data, segment) => assert(data === byteBufferToString(reader.read(segment))) } reader.close() } test("WriteAheadLogRandomReader - reading data using random reader written with writer") { + // Write data using writer for testing the random reader val file = new File(tempDirectory, "TestRandomReads") - val dataToWrite = generateRandomData() - val segments = writeDataUsingWriter(file, dataToWrite) + val data = generateRandomData() + val segments = writeDataUsingWriter(file, data) + + // Read a random sequence of segments and verify read data + val dataAndSegments = data.zip(segments).toSeq.permutations.take(10).flatten val reader = new WriteAheadLogRandomReader("file:///" + file.toString, hadoopConf) - val writtenData = segments.map { byteBuffer => byteBufferToString(reader.read(byteBuffer)) } - assert(dataToWrite.toList === writtenData.toList) + dataAndSegments.foreach { case(data, segment) => + assert(data === byteBufferToString(reader.read(segment))) + } + reader.close() } - test("WriteAheadLogManager - write rotating logs and reading from them") { - val fakeClock = new ManualClock - val manager = new WriteAheadLogManager(tempDirectory.toString, hadoopConf, - rollingIntervalSecs = 1, clock = fakeClock) + test("WriteAheadLogManager - write rotating logs") { + // Write data using manager val dataToWrite = generateRandomData(10) - dataToWrite.foreach { data => - fakeClock.addToTime(500) - manager.writeToLog(data) - } - println(tempDirectory.list().mkString("\n")) - assert(tempDirectory.list().size > 1) - - val readData = manager.readFromLog().map(byteBufferToString).toSeq - println("Generated data") - printData(dataToWrite) - println("Read data") - printData(readData) + writeDataUsingManager(tempDirectory, dataToWrite) + + // Read data manually to verify the written data + val logFiles = getLogFilesInDirectory(tempDirectory) + assert(logFiles.size > 1) + val writtenData = logFiles.flatMap { file => readDataManually(file) } + assert(writtenData.toList === dataToWrite.toList) + } + + test("WriteAheadLogManager - read rotating logs") { + // Write data manually for testing reading through manager + val writtenData = (1 to 10).map { i => + val data = generateRandomData(10) + val file = new File(tempDirectory, s"log-$i-${i + 1}") + writeDataManually(data, file) + println(s"Generated log file $file") + data + }.flatten + + // Read data using manager and verify + val readData = readDataUsingManager(tempDirectory) + assert(readData.toList === writtenData.toList) + } + + test("WriteAheadLogManager - recover past logs when creating new manager") { + // Write data with manager, recover with new manager and verify + val dataToWrite = generateRandomData(100) + writeDataUsingManager(tempDirectory, dataToWrite) + val logFiles = getLogFilesInDirectory(tempDirectory) + println("==========\n" + logFiles.mkString("\n") + "\n==========\n" ) + + assert(logFiles.size > 1) + val readData = readDataUsingManager(tempDirectory) assert(dataToWrite.toList === readData.toList) } + // TODO (Hari, TD): Test different failure conditions of writers and readers. + // - Failure in the middle of write + // - Failure while reading incomplete/corrupt file +} + +object WriteAheadLogSuite { + + private val hadoopConf = new Configuration() + /** * Write data to the file and returns the an array of the bytes written. * This is used to test the WAL reader independently of the WAL writer. */ - def writeRandomDataToLogManually(count: Int, file: File): ArrayBuffer[(String, Long, Int)] = { - val writtenData = new ArrayBuffer[(String, Long, Int)]() + def writeDataManually(data: Seq[String], file: File): Seq[FileSegment] = { + val segments = new ArrayBuffer[FileSegment]() val writer = new RandomAccessFile(file, "rw") - for (i <- 1 to count) { - val data = generateRandomData(1).head + data.foreach { item => val offset = writer.getFilePointer() - val bytes = Utils.serialize(data) + val bytes = Utils.serialize(item) writer.writeInt(bytes.size) writer.write(bytes) - writtenData += ((data, offset, bytes.size)) + segments += FileSegment(file.toString, offset, bytes.size) } writer.close() - writtenData + segments + } + + def writeDataUsingWriter(file: File, data: Seq[String]): Seq[FileSegment] = { + val writer = new WriteAheadLogWriter(file.toString, hadoopConf) + val segments = data.map { + item => writer.write(item) + } + writer.close() + segments + } + + def writeDataUsingManager(logDirectory: File, data: Seq[String]) { + val fakeClock = new ManualClock + val manager = new WriteAheadLogManager(logDirectory.toString, hadoopConf, + rollingIntervalSecs = 1, clock = fakeClock) + data.foreach { item => + fakeClock.addToTime(500) + manager.writeToLog(item) + } + manager.stop() } /** * Read data from the given segments of log file and returns the read list of byte buffers. * This is used to test the WAL writer independently of the WAL reader. */ - def readDataFromLogManually(file: File, segments: Seq[FileSegment]): Seq[String] = { + def readDataManually(file: File, segments: Seq[FileSegment]): Seq[String] = { val reader = new RandomAccessFile(file, "r") println("File " + file + " has " + reader.length() + " bytes ") segments.map { x => @@ -167,22 +228,44 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { } } + def readDataManually(file: File): Seq[String] = { + val reader = new DataInputStream(new FileInputStream(file)) + val buffer = new ArrayBuffer[String] + try { + while (reader.available > 0) { + val length = reader.readInt() + val bytes = new Array[Byte](length) + reader.read(bytes) + buffer += Utils.deserialize[String](bytes) + } + } finally { + reader.close() + } + buffer + } + + def readDataUsingManager(logDirectory: File): Seq[String] = { + val manager = new WriteAheadLogManager(logDirectory.toString, hadoopConf) + val data = manager.readFromLog().map(byteBufferToString).toSeq + manager.stop() + data + } + def generateRandomData(numItems: Int = 50, itemSize: Int = 50): Seq[String] = { (1 to numItems).map { _.toString } } + def getLogFilesInDirectory(directory: File): Seq[File] = { + println("[ " + directory.listFiles().filter(_.getName().startsWith("log-")).mkString(" | ") + " ]") + directory.listFiles().filter(_.getName().startsWith("log-")) + .sortBy(_.getName.split("-")(1).toLong) + } + def printData(data: Seq[String]) { println("# items in data = " + data.size) println(data.mkString("\n")) } - def writeDataUsingWriter(file: File, input: Seq[String]): Seq[FileSegment] = { - val writer = new WriteAheadLogWriter(file.toString, hadoopConf) - val segments = input.map { data => writer.write(data) } - writer.close() - segments - } - implicit def stringToByteBuffer(str: String): ByteBuffer = { ByteBuffer.wrap(Utils.serialize(str)) }