Skip to content

Commit

Permalink
Fix flaky issue of Kafka real unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
jerryshao committed Aug 3, 2014
1 parent 4559310 commit 5525f10
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 31 deletions.
2 changes: 1 addition & 1 deletion external/kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
<dependency>
<groupId>net.sf.jopt-simple</groupId>
<artifactId>jopt-simple</artifactId>
<version>4.5</version>
<version>3.2</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,11 @@ public void tearDown() {
}

@Test
public void testKafkaStream() {
public void testKafkaStream() throws InterruptedException {
String topic = "topic1";
HashMap<String, Integer> topics = new HashMap<String, Integer>();
topics.put(topic, 1);
testSuite.createTopic(topic);

HashMap<String, Integer> sent = new HashMap<String, Integer>();
sent.put("a", 5);
Expand Down Expand Up @@ -107,13 +108,15 @@ public Void call(JavaPairRDD<String, Long> rdd) throws Exception {

ssc.start();

// Sleep to let Receiver start first
Thread.sleep(3000);

HashMap<String, Object> tmp = new HashMap<String, Object>(sent);
testSuite.produceAndSendTestMessage(topic,
testSuite.produceAndSendMessage(topic,
JavaConverters.asScalaMapConverter(tmp).asScala().toMap(
Predef.<Tuple2<String, Object>>conforms()
));
Predef.<Tuple2<String, Object>>conforms()));

ssc.awaitTermination(10000);
ssc.awaitTermination(3000);

Assert.assertEquals(sent.size(), result.size());
for (String k : sent.keySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,32 @@ import kafka.admin.CreateTopicCommand
import kafka.common.TopicAndPartition
import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
import kafka.utils.ZKStringSerializer
import kafka.serializer.StringEncoder
import kafka.serializer.{StringDecoder, StringEncoder}
import kafka.server.{KafkaConfig, KafkaServer}

import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
import org.I0Itec.zkclient.ZkClient

import org.apache.zookeeper.server.ZooKeeperServer
import org.apache.zookeeper.server.NIOServerCnxnFactory

import org.I0Itec.zkclient.ZkClient
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
import org.apache.spark.storage.StorageLevel

class KafkaStreamSuite extends TestSuiteBase {
import KafkaStreamSuite._

val zkConnect = "localhost:2181"
var zookeeper: EmbeddedZookeeper = _
var zkClient: ZkClient = _
val zkConnectionTimeout = 6000
val zkSessionTimeout = 6000

val brokerPort = 9092
val brokerProps = getBrokerConfig(brokerPort)
val brokerConf = new KafkaConfig(brokerProps)
var server: KafkaServer = _

protected var zookeeper: EmbeddedZookeeper = _
protected var zkClient: ZkClient = _
protected var server: KafkaServer = _
protected var producer: Producer[String, String] = _

override def useManualClock = false

Expand All @@ -68,21 +74,32 @@ class KafkaStreamSuite extends TestSuiteBase {
}

override def afterFunction() {
producer.close()
server.shutdown()
brokerConf.logDirs.foreach { f => KafkaStreamSuite.deleteDir(new File(f)) }
brokerConf.logDirs.foreach { f => deleteDir(new File(f)) }

zkClient.close()
zookeeper.shutdown()

super.afterFunction()
}

test("kafka input stream") {
test("Kafka input stream") {
val ssc = new StreamingContext(master, framework, batchDuration)
val topic = "topic1"
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)

val stream = KafkaUtils.createStream(ssc, zkConnect, "group", Map(topic -> 1))
createTopic(topic)
produceAndSendMessage(topic, sent)

val kafkaParams = Map("zookeeper.connect" -> zkConnect,
"group.id" -> s"test-consumer-${random.nextInt(10000)}",
"auto.offset.reset" -> "smallest")

val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
Map(topic -> 1),
StorageLevel.MEMORY_ONLY)
val result = new mutable.HashMap[String, Long]()
stream.map { case (k, v) => v }
.countByValue()
Expand All @@ -94,8 +111,7 @@ class KafkaStreamSuite extends TestSuiteBase {
}
}
ssc.start()
produceAndSendTestMessage(topic, sent)
ssc.awaitTermination(10000)
ssc.awaitTermination(3000)

assert(sent.size === result.size)
sent.keys.foreach { k => assert(sent(k) === result(k).toInt) }
Expand All @@ -108,7 +124,7 @@ class KafkaStreamSuite extends TestSuiteBase {
props.put("broker.id", "0")
props.put("host.name", "localhost")
props.put("port", port.toString)
props.put("log.dir", KafkaStreamSuite.tmpDir().getAbsolutePath)
props.put("log.dir", createTmpDir().getAbsolutePath)
props.put("zookeeper.connect", zkConnect)
props.put("log.flush.interval.messages", "1")
props.put("replica.socket.timeout.ms", "1500")
Expand All @@ -130,28 +146,27 @@ class KafkaStreamSuite extends TestSuiteBase {
messages.toSeq
}

def produceAndSendTestMessage(topic: String, sent: Map[String, Int]) {
val brokerAddr = brokerConf.hostName + ":" + brokerConf.port
val producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr)))
def createTopic(topic: String) {
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
logInfo("==================== 5 ====================")
// wait until metadata is propagated
Thread.sleep(1000)
assert(server.apis.leaderCache.keySet.contains(TopicAndPartition(topic, 0)))
producer.send(createTestMessage(topic, sent): _*)
Thread.sleep(1000)
waitUntilMetadataIsPropagated(Seq(server), topic, 0, 1000)
}

def produceAndSendMessage(topic: String, sent: Map[String, Int]) {
val brokerAddr = brokerConf.hostName + ":" + brokerConf.port
producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr)))
producer.send(createTestMessage(topic, sent): _*)
logInfo("==================== 6 ====================")
producer.close()
}
}

object KafkaStreamSuite {
val random = new Random()

def tmpDir(): File = {
def createTmpDir(): File = {
val tmp = System.getProperty("java.io.tmpdir")
val f = new File(tmp, "spark-kafka-" + random.nextInt(1000))
val f = new File(tmp, "spark-kafka-" + random.nextInt(10000))
f.mkdirs()
f
}
Expand All @@ -166,12 +181,33 @@ object KafkaStreamSuite {
file.delete()
}
}

def waitUntilTrue(condition: () => Boolean, waitTime: Long): Boolean = {
val startTime = System.currentTimeMillis()
while (true) {
if (condition())
return true
if (System.currentTimeMillis() > startTime + waitTime)
return false
Thread.sleep(waitTime.min(100L))
}
// Should never go to here
throw new RuntimeException("unexpected error")
}

def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int,
timeout: Long) {
assert(waitUntilTrue(() =>
servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(
TopicAndPartition(topic, partition))), timeout),
s"Partition [$topic, $partition] metadata not propagated after timeout")
}
}

class EmbeddedZookeeper(val zkConnect: String) {
val random = new Random()
val snapshotDir = KafkaStreamSuite.tmpDir()
val logDir = KafkaStreamSuite.tmpDir()
val snapshotDir = KafkaStreamSuite.createTmpDir()
val logDir = KafkaStreamSuite.createTmpDir()

val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
val(ip, port) = {
Expand Down

0 comments on commit 5525f10

Please sign in to comment.