Skip to content

Commit

Permalink
Merge pull request #18 from harishreedharan/driver-ha-wal
Browse files Browse the repository at this point in the history
Fix tests to not ignore ordering and also assert all data is present
  • Loading branch information
tdas committed Oct 23, 2014
2 parents ef8db09 + 82ce56e commit eb356ca
Showing 1 changed file with 24 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.streaming.util
import java.io._
import java.nio.ByteBuffer

import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileStatus, Path}

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
Expand All @@ -41,14 +41,14 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
val hadoopConf = new Configuration()
val dfsDir = Files.createTempDir()
val TEST_BUILD_DATA_KEY: String = "test.build.data"
val oldTestBuildDataProp = System.getProperty(TEST_BUILD_DATA_KEY)
val oldTestBuildDataProp = Option(System.getProperty(TEST_BUILD_DATA_KEY))
System.setProperty(TEST_BUILD_DATA_KEY, dfsDir.toString)
val cluster = new MiniDFSCluster(new Configuration, 2, true, null)
val nnPort = cluster.getNameNode.getNameNodeAddress.getPort
val hdfsUrl = s"hdfs://localhost:$nnPort/${getRandomString()}/"
val hdfsUrl = s"hdfs://localhost:$nnPort/${getRandomString()}/"
var pathForTest: String = null

override def beforeAll() {
System.setProperty(TEST_BUILD_DATA_KEY, dfsDir.toString)
cluster.waitActive()
}

Expand All @@ -59,7 +59,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
override def afterAll() {
cluster.shutdown()
FileUtils.deleteDirectory(dfsDir)
System.setProperty(TEST_BUILD_DATA_KEY, oldTestBuildDataProp)
oldTestBuildDataProp.foreach(System.setProperty(TEST_BUILD_DATA_KEY, _))
}

test("WriteAheadLogWriter - writing data") {
Expand All @@ -71,8 +71,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
assert(writtenData.toArray === dataToWrite.toArray)
}

test("WriteAheadLogWriter - syncing of data by writing and reading immediately using " +
"Minicluster") {
test("WriteAheadLogWriter - syncing of data by writing and reading immediately") {
val dataToWrite = generateRandomData()
val writer = new WriteAheadLogWriter(pathForTest, hadoopConf)
dataToWrite.foreach { data =>
Expand All @@ -98,7 +97,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
reader.close()
}

test("WriteAheadLogReader - sequentially reading data written with writer using Minicluster") {
test("WriteAheadLogReader - sequentially reading data written with writer") {
// Write data manually for testing the sequential reader
val dataToWrite = generateRandomData()
writeDataUsingWriter(pathForTest, dataToWrite)
Expand All @@ -124,8 +123,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
reader.close()
}

test("WriteAheadLogRandomReader - reading data using random reader written with writer using " +
"Minicluster") {
test("WriteAheadLogRandomReader - reading data using random reader written with writer") {
// Write data using writer for testing the random reader
val data = generateRandomData()
val segments = writeDataUsingWriter(pathForTest, data)
Expand All @@ -141,24 +139,23 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte

test("WriteAheadLogManager - write rotating logs") {
// Write data using manager
val dataToWrite = generateRandomData(10)
val dataToWrite = generateRandomData()
val dir = pathForTest
writeDataUsingManager(dir, dataToWrite)

// Read data manually to verify the written data
val logFiles = getLogFilesInDirectory(dir)
assert(logFiles.size > 1)
val writtenData = logFiles.flatMap { file => readDataManually(file) }
assert(writtenData.toSet === dataToWrite.toSet)
val writtenData = logFiles.flatMap { file => readDataManually(file)}
assert(writtenData.toList === dataToWrite.toList)
}

// This one is failing right now -- commenting out for now.
test("WriteAheadLogManager - read rotating logs") {
// Write data manually for testing reading through manager
val dir = pathForTest
val writtenData = (1 to 10).map { i =>
val data = generateRandomData(10)
val file = dir + "/log-" + i
val data = generateRandomData()
val file = dir + s"/log-$i-$i"
writeDataManually(data, file)
data
}.flatten
Expand All @@ -169,12 +166,12 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte

// Read data using manager and verify
val readData = readDataUsingManager(dir)
// assert(readData.toList === writtenData.toList)
assert(readData.toList === writtenData.toList)
}

test("WriteAheadLogManager - recover past logs when creating new manager") {
// Write data with manager, recover with new manager and verify
val dataToWrite = generateRandomData(100)
val dataToWrite = generateRandomData()
val dir = pathForTest
writeDataUsingManager(dir, dataToWrite)
val logFiles = getLogFilesInDirectory(dir)
Expand All @@ -186,7 +183,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
test("WriteAheadLogManager - cleanup old logs") {
// Write data with manager, recover with new manager and verify
val dir = pathForTest
val dataToWrite = generateRandomData(100)
val dataToWrite = generateRandomData()
val fakeClock = new ManualClock
val manager = new WriteAheadLogManager(dir, hadoopConf,
rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock)
Expand All @@ -201,7 +198,6 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter with BeforeAndAfte
assert(getLogFilesInDirectory(dir).size < logFiles.size)
}
}

// TODO (Hari, TD): Test different failure conditions of writers and readers.
// - Failure while reading incomplete/corrupt file
}
Expand Down Expand Up @@ -243,8 +239,10 @@ object WriteAheadLogSuite {

def writeDataUsingManager(logDirectory: String, data: Seq[String]) {
val fakeClock = new ManualClock
fakeClock.setTime(1000000)
val manager = new WriteAheadLogManager(logDirectory, hadoopConf,
rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock)
// Ensure that 500 does not get sorted after 2000, so put a high base value.
data.foreach { item =>
fakeClock.addToTime(500)
manager.writeToLog(item)
Expand All @@ -271,7 +269,8 @@ object WriteAheadLogSuite {
val reader = HdfsUtils.getInputStream(file, hadoopConf)
val buffer = new ArrayBuffer[String]
try {
while (true) { // Read till EOF is thrown
while (true) {
// Read till EOF is thrown
val length = reader.readInt()
val bytes = new Array[Byte](length)
reader.read(bytes)
Expand All @@ -293,8 +292,10 @@ object WriteAheadLogSuite {
data
}

def generateRandomData(numItems: Int = 50, itemSize: Int = 50): Seq[String] = {
(1 to numItems).map { _.toString }
def generateRandomData(): Seq[String] = {
(1 to 50).map {
_.toString
}
}

def getLogFilesInDirectory(directory: String): Seq[String] = {
Expand Down

0 comments on commit eb356ca

Please sign in to comment.