From 796d4ca6ed2e420c73e1beab9518a324df74457c Mon Sep 17 00:00:00 2001 From: jerryshao Date: Fri, 24 Jan 2014 18:51:21 +0800 Subject: [PATCH 1/6] Add real Kafka streaming test Conflicts: external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala project/SparkBuild.scala --- external/kafka/pom.xml | 6 + .../streaming/kafka/JavaKafkaStreamSuite.java | 117 +++++++++--- .../streaming/kafka/KafkaStreamSuite.scala | 170 ++++++++++++++++-- 3 files changed, 257 insertions(+), 36 deletions(-) diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml index daf03360bc5f5..7fe43834a5d8e 100644 --- a/external/kafka/pom.xml +++ b/external/kafka/pom.xml @@ -70,6 +70,12 @@ + + net.sf.jopt-simple + jopt-simple + 4.5 + test + org.scalatest scalatest_${scala.binary.version} diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index 9f8046bf00f8f..641c17a9f4c08 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -17,31 +17,108 @@ package org.apache.spark.streaming.kafka; +import java.io.Serializable; import java.util.HashMap; +import java.util.List; -import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; -import org.junit.Test; -import com.google.common.collect.Maps; -import kafka.serializer.StringDecoder; -import org.apache.spark.storage.StorageLevel; +import scala.Predef; +import scala.Tuple2; +import scala.collection.JavaConverters; + +import junit.framework.Assert; + +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.LocalJavaStreamingContext; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.kafka.KafkaStreamSuite; + +import org.junit.Test; +import org.junit.After; +import org.junit.Before; + +public class JavaKafkaStreamSuite extends LocalJavaStreamingContext implements Serializable { + private transient KafkaStreamSuite testSuite = new KafkaStreamSuite(); + + @Before + @Override + public void setUp() { + testSuite.beforeFunction(); + System.clearProperty("spark.driver.port"); + System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock"); + ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + } + + @After + @Override + public void tearDown() { + ssc.stop(); + ssc = null; + System.clearProperty("spark.driver.port"); + testSuite.afterFunction(); + } -public class JavaKafkaStreamSuite extends LocalJavaStreamingContext { @Test public void testKafkaStream() { - HashMap topics = Maps.newHashMap(); - - // tests the API, does not actually test data receiving - JavaPairReceiverInputDStream test1 = - KafkaUtils.createStream(ssc, "localhost:12345", "group", topics); - JavaPairReceiverInputDStream test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, - StorageLevel.MEMORY_AND_DISK_SER_2()); - - HashMap kafkaParams = Maps.newHashMap(); - kafkaParams.put("zookeeper.connect", "localhost:12345"); - kafkaParams.put("group.id","consumer-group"); - JavaPairReceiverInputDStream test3 = KafkaUtils.createStream(ssc, - String.class, String.class, StringDecoder.class, StringDecoder.class, - kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2()); + String topic = "topic1"; + HashMap topics = new HashMap(); + topics.put(topic, 1); + + HashMap sent = new HashMap(); + sent.put("a", 5); + sent.put("b", 3); + sent.put("c", 10); + + JavaPairDStream stream = KafkaUtils.createStream(ssc, + testSuite.zkConnect(), + "group", + topics); + + final HashMap result = new HashMap(); + + JavaDStream words = stream.map( + new Function, String>() { + @Override + public String call(Tuple2 tuple2) throws Exception { + return tuple2._2(); + } + } + ); + + words.countByValue().foreachRDD( + new Function, Void>() { + @Override + public Void call(JavaPairRDD rdd) throws Exception { + List> ret = rdd.collect(); + for (Tuple2 r : ret) { + if (result.containsKey(r._1())) { + result.put(r._1(), result.get(r._1()) + r._2()); + } else { + result.put(r._1(), r._2()); + } + } + + return null; + } + } + ); + + ssc.start(); + + HashMap tmp = new HashMap(sent); + testSuite.produceAndSendTestMessage(topic, + JavaConverters.asScalaMapConverter(tmp).asScala().toMap( + Predef.>conforms() + )); + + ssc.awaitTermination(10000); + + Assert.assertEquals(sent.size(), result.size()); + for (String k : sent.keySet()) { + Assert.assertEquals(sent.get(k).intValue(), result.get(k).intValue()); + } } } diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index e6f2c4a5cf5d1..5bf6cefed9109 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -17,28 +17,166 @@ package org.apache.spark.streaming.kafka -import kafka.serializer.StringDecoder +import java.io.File +import java.net.InetSocketAddress +import java.util.{Properties, Random} + +import scala.collection.mutable + +import kafka.admin.CreateTopicCommand +import kafka.common.TopicAndPartition +import kafka.producer.{KeyedMessage, ProducerConfig, Producer} +import kafka.utils.ZKStringSerializer +import kafka.serializer.StringEncoder +import kafka.server.{KafkaConfig, KafkaServer} + import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.apache.zookeeper.server.ZooKeeperServer +import org.apache.zookeeper.server.NIOServerCnxnFactory + +import org.I0Itec.zkclient.ZkClient class KafkaStreamSuite extends TestSuiteBase { + val zkConnect = "localhost:2181" + var zookeeper: EmbeddedZookeeper = _ + var zkClient: ZkClient = _ + val zkConnectionTimeout = 6000 + val zkSessionTimeout = 6000 + + val brokerPort = 9092 + val brokerProps = getBrokerConfig(brokerPort) + val brokerConf = new KafkaConfig(brokerProps) + var server: KafkaServer = _ + + override def beforeFunction() { + // Zookeeper server startup + zookeeper = new EmbeddedZookeeper(zkConnect) + zkClient = new ZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer) + + // Kafka broker startup + server = new KafkaServer(brokerConf) + server.startup() + + super.beforeFunction() + } + + override def afterFunction() { + server.shutdown() + brokerConf.logDirs.foreach { f => KafkaStreamSuite.deleteDir(new File(f)) } + + zkClient.close() + zookeeper.shutdown() + + super.afterFunction() + } test("kafka input stream") { val ssc = new StreamingContext(master, framework, batchDuration) - val topics = Map("my-topic" -> 1) - - // tests the API, does not actually test data receiving - val test1: ReceiverInputDStream[(String, String)] = - KafkaUtils.createStream(ssc, "localhost:1234", "group", topics) - val test2: ReceiverInputDStream[(String, String)] = - KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2) - val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group") - val test3: ReceiverInputDStream[(String, String)] = - KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( - ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2) - - // TODO: Actually test receiving data + val topic = "topic1" + val sent = Map("a" -> 5, "b" -> 3, "c" -> 10) + + val stream = KafkaUtils.createStream(ssc, zkConnect, "group", Map(topic -> 1)) + val result = new mutable.HashMap[String, Long]() + stream.map { case (k, v) => v } + .countByValue() + .foreachRDD { r => + val ret = r.collect() + ret.toMap.foreach { kv => + val count = result.getOrElseUpdate(kv._1, 0) + kv._2 + result.put(kv._1, count) + } + } + ssc.start() + produceAndSendTestMessage(topic, sent) + ssc.awaitTermination(10000) + + assert(sent.size === result.size) + sent.keys.foreach { k => assert(sent(k) === result(k).toInt) } + ssc.stop() } + + private def getBrokerConfig(port: Int): Properties = { + val props = new Properties() + props.put("broker.id", "0") + props. + put("host.name", "localhost") + props.put("port", port.toString) + props.put("log.dir", KafkaStreamSuite.tmpDir().getAbsolutePath) + props.put("zookeeper.connect", zkConnect) + props.put("log.flush.interval.messages", "1") + props.put("replica.socket.timeout.ms", "1500") + props + } + + private def getProducerConfig(brokerList: String): Properties = { + val props = new Properties() + props.put("metadata.broker.list", brokerList) + props.put("serializer.class", classOf[StringEncoder].getName) + props + } + + private def createTestMessage(topic: String, sent: Map[String, Int]) + : Seq[KeyedMessage[String, String]] = { + val messages = for ((s, freq) <- sent; i <- 0 until freq) yield { + new KeyedMessage[String, String](topic, s) + } + messages.toSeq + } + + def produceAndSendTestMessage(topic: String, sent: Map[String, Int]) { + val brokerAddr = brokerConf.hostName + ":" + brokerConf.port + val producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr))) + CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0") + + // wait until metadata is propagated + Thread.sleep(1000) + assert(server.apis.leaderCache.keySet.contains(TopicAndPartition(topic, 0))) + + producer.send(createTestMessage(topic, sent): _*) + producer.close() + } +} + +object KafkaStreamSuite { + val random = new Random() + + def tmpDir(): File = { + val tmp = System.getProperty("java.io.tmpdir") + val f = new File(tmp, "spark-kafka-" + random.nextInt(1000)) + f.mkdirs() + f + } + + def deleteDir(file: File) { + if (file.isFile) { + file.delete() + } else { + for (f <- file.listFiles()) { + deleteDir(f) + } + file.delete() + } + } +} + +class EmbeddedZookeeper(val zkConnect: String) { + val random = new Random() + val snapshotDir = KafkaStreamSuite.tmpDir() + val logDir = KafkaStreamSuite.tmpDir() + + val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500) + val(ip, port) = { + val splits = zkConnect.split(":") + (splits(0), splits(1).toInt) + } + val factory = new NIOServerCnxnFactory() + factory.configure(new InetSocketAddress(ip, port), 16) + factory.startup(zookeeper) + + def shutdown() { + factory.shutdown() + KafkaStreamSuite.deleteDir(snapshotDir) + KafkaStreamSuite.deleteDir(logDir) + } } From 860f649be9b3052688c1d6b4540458657bc2e759 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Mon, 10 Feb 2014 10:09:43 +0800 Subject: [PATCH 2/6] Minor style changes, and tests ignored due to flakiness Conflicts: project/SparkBuild.scala --- .../streaming/kafka/JavaKafkaStreamSuite.java | 4 ++-- .../streaming/kafka/KafkaStreamSuite.scala | 18 ++++++++++++------ 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index 641c17a9f4c08..f836aea083e52 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -34,9 +34,9 @@ import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.kafka.KafkaStreamSuite; import org.junit.Test; +import org.junit.Ignore; import org.junit.After; import org.junit.Before; @@ -61,7 +61,7 @@ public void tearDown() { testSuite.afterFunction(); } - @Test + @Ignore @Test public void testKafkaStream() { String topic = "topic1"; HashMap topics = new HashMap(); diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index 5bf6cefed9109..692c84835c289 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -51,12 +51,17 @@ class KafkaStreamSuite extends TestSuiteBase { override def beforeFunction() { // Zookeeper server startup zookeeper = new EmbeddedZookeeper(zkConnect) + logInfo("==================== 0 ====================") zkClient = new ZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer) + logInfo("==================== 1 ====================") // Kafka broker startup server = new KafkaServer(brokerConf) + logInfo("==================== 2 ====================") server.startup() - + logInfo("==================== 3 ====================") + Thread.sleep(2000) + logInfo("==================== 4 ====================") super.beforeFunction() } @@ -70,7 +75,7 @@ class KafkaStreamSuite extends TestSuiteBase { super.afterFunction() } - test("kafka input stream") { + ignore("kafka input stream") { val ssc = new StreamingContext(master, framework, batchDuration) val topic = "topic1" val sent = Map("a" -> 5, "b" -> 3, "c" -> 10) @@ -99,8 +104,7 @@ class KafkaStreamSuite extends TestSuiteBase { private def getBrokerConfig(port: Int): Properties = { val props = new Properties() props.put("broker.id", "0") - props. - put("host.name", "localhost") + props.put("host.name", "localhost") props.put("port", port.toString) props.put("log.dir", KafkaStreamSuite.tmpDir().getAbsolutePath) props.put("zookeeper.connect", zkConnect) @@ -128,12 +132,14 @@ class KafkaStreamSuite extends TestSuiteBase { val brokerAddr = brokerConf.hostName + ":" + brokerConf.port val producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr))) CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0") - + logInfo("==================== 5 ====================") // wait until metadata is propagated Thread.sleep(1000) assert(server.apis.leaderCache.keySet.contains(TopicAndPartition(topic, 0))) - producer.send(createTestMessage(topic, sent): _*) + Thread.sleep(1000) + + logInfo("==================== 6 ====================") producer.close() } } From 45593105ba131366a1cee43e4c9c79e89735b5b7 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 30 Apr 2014 09:07:06 +0800 Subject: [PATCH 3/6] Minor changes for Kafka unit test --- .../apache/spark/streaming/kafka/JavaKafkaStreamSuite.java | 5 ++--- .../org/apache/spark/streaming/kafka/KafkaStreamSuite.scala | 4 +++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index f836aea083e52..dcad1a28b2054 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -36,7 +36,6 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.junit.Test; -import org.junit.Ignore; import org.junit.After; import org.junit.Before; @@ -48,7 +47,7 @@ public class JavaKafkaStreamSuite extends LocalJavaStreamingContext implements S public void setUp() { testSuite.beforeFunction(); System.clearProperty("spark.driver.port"); - System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock"); + //System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock"); ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); } @@ -61,7 +60,7 @@ public void tearDown() { testSuite.afterFunction(); } - @Ignore @Test + @Test public void testKafkaStream() { String topic = "topic1"; HashMap topics = new HashMap(); diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index 692c84835c289..6cf3d78e7f0f1 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -48,6 +48,8 @@ class KafkaStreamSuite extends TestSuiteBase { val brokerConf = new KafkaConfig(brokerProps) var server: KafkaServer = _ + override def useManualClock = false + override def beforeFunction() { // Zookeeper server startup zookeeper = new EmbeddedZookeeper(zkConnect) @@ -75,7 +77,7 @@ class KafkaStreamSuite extends TestSuiteBase { super.afterFunction() } - ignore("kafka input stream") { + test("kafka input stream") { val ssc = new StreamingContext(master, framework, batchDuration) val topic = "topic1" val sent = Map("a" -> 5, "b" -> 3, "c" -> 10) From 5525f10006fe17e36b6d328a6e1bbc040d760e90 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 23 Jul 2014 09:42:40 +0800 Subject: [PATCH 4/6] Fix flaky issue of Kafka real unit test --- external/kafka/pom.xml | 2 +- .../streaming/kafka/JavaKafkaStreamSuite.java | 13 +-- .../streaming/kafka/KafkaStreamSuite.scala | 86 +++++++++++++------ 3 files changed, 70 insertions(+), 31 deletions(-) diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml index 7fe43834a5d8e..2aee99949223a 100644 --- a/external/kafka/pom.xml +++ b/external/kafka/pom.xml @@ -73,7 +73,7 @@ net.sf.jopt-simple jopt-simple - 4.5 + 3.2 test diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index dcad1a28b2054..c67d63f5ff00a 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -61,10 +61,11 @@ public void tearDown() { } @Test - public void testKafkaStream() { + public void testKafkaStream() throws InterruptedException { String topic = "topic1"; HashMap topics = new HashMap(); topics.put(topic, 1); + testSuite.createTopic(topic); HashMap sent = new HashMap(); sent.put("a", 5); @@ -107,13 +108,15 @@ public Void call(JavaPairRDD rdd) throws Exception { ssc.start(); + // Sleep to let Receiver start first + Thread.sleep(3000); + HashMap tmp = new HashMap(sent); - testSuite.produceAndSendTestMessage(topic, + testSuite.produceAndSendMessage(topic, JavaConverters.asScalaMapConverter(tmp).asScala().toMap( - Predef.>conforms() - )); + Predef.>conforms())); - ssc.awaitTermination(10000); + ssc.awaitTermination(3000); Assert.assertEquals(sent.size(), result.size()); for (String k : sent.keySet()) { diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index 6cf3d78e7f0f1..a697230d62582 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -27,26 +27,32 @@ import kafka.admin.CreateTopicCommand import kafka.common.TopicAndPartition import kafka.producer.{KeyedMessage, ProducerConfig, Producer} import kafka.utils.ZKStringSerializer -import kafka.serializer.StringEncoder +import kafka.serializer.{StringDecoder, StringEncoder} import kafka.server.{KafkaConfig, KafkaServer} -import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} +import org.I0Itec.zkclient.ZkClient + import org.apache.zookeeper.server.ZooKeeperServer import org.apache.zookeeper.server.NIOServerCnxnFactory -import org.I0Itec.zkclient.ZkClient +import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} +import org.apache.spark.storage.StorageLevel class KafkaStreamSuite extends TestSuiteBase { + import KafkaStreamSuite._ + val zkConnect = "localhost:2181" - var zookeeper: EmbeddedZookeeper = _ - var zkClient: ZkClient = _ val zkConnectionTimeout = 6000 val zkSessionTimeout = 6000 val brokerPort = 9092 val brokerProps = getBrokerConfig(brokerPort) val brokerConf = new KafkaConfig(brokerProps) - var server: KafkaServer = _ + + protected var zookeeper: EmbeddedZookeeper = _ + protected var zkClient: ZkClient = _ + protected var server: KafkaServer = _ + protected var producer: Producer[String, String] = _ override def useManualClock = false @@ -68,8 +74,9 @@ class KafkaStreamSuite extends TestSuiteBase { } override def afterFunction() { + producer.close() server.shutdown() - brokerConf.logDirs.foreach { f => KafkaStreamSuite.deleteDir(new File(f)) } + brokerConf.logDirs.foreach { f => deleteDir(new File(f)) } zkClient.close() zookeeper.shutdown() @@ -77,12 +84,22 @@ class KafkaStreamSuite extends TestSuiteBase { super.afterFunction() } - test("kafka input stream") { + test("Kafka input stream") { val ssc = new StreamingContext(master, framework, batchDuration) val topic = "topic1" val sent = Map("a" -> 5, "b" -> 3, "c" -> 10) - - val stream = KafkaUtils.createStream(ssc, zkConnect, "group", Map(topic -> 1)) + createTopic(topic) + produceAndSendMessage(topic, sent) + + val kafkaParams = Map("zookeeper.connect" -> zkConnect, + "group.id" -> s"test-consumer-${random.nextInt(10000)}", + "auto.offset.reset" -> "smallest") + + val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( + ssc, + kafkaParams, + Map(topic -> 1), + StorageLevel.MEMORY_ONLY) val result = new mutable.HashMap[String, Long]() stream.map { case (k, v) => v } .countByValue() @@ -94,8 +111,7 @@ class KafkaStreamSuite extends TestSuiteBase { } } ssc.start() - produceAndSendTestMessage(topic, sent) - ssc.awaitTermination(10000) + ssc.awaitTermination(3000) assert(sent.size === result.size) sent.keys.foreach { k => assert(sent(k) === result(k).toInt) } @@ -108,7 +124,7 @@ class KafkaStreamSuite extends TestSuiteBase { props.put("broker.id", "0") props.put("host.name", "localhost") props.put("port", port.toString) - props.put("log.dir", KafkaStreamSuite.tmpDir().getAbsolutePath) + props.put("log.dir", createTmpDir().getAbsolutePath) props.put("zookeeper.connect", zkConnect) props.put("log.flush.interval.messages", "1") props.put("replica.socket.timeout.ms", "1500") @@ -130,28 +146,27 @@ class KafkaStreamSuite extends TestSuiteBase { messages.toSeq } - def produceAndSendTestMessage(topic: String, sent: Map[String, Int]) { - val brokerAddr = brokerConf.hostName + ":" + brokerConf.port - val producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr))) + def createTopic(topic: String) { CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0") logInfo("==================== 5 ====================") // wait until metadata is propagated - Thread.sleep(1000) - assert(server.apis.leaderCache.keySet.contains(TopicAndPartition(topic, 0))) - producer.send(createTestMessage(topic, sent): _*) - Thread.sleep(1000) + waitUntilMetadataIsPropagated(Seq(server), topic, 0, 1000) + } + def produceAndSendMessage(topic: String, sent: Map[String, Int]) { + val brokerAddr = brokerConf.hostName + ":" + brokerConf.port + producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr))) + producer.send(createTestMessage(topic, sent): _*) logInfo("==================== 6 ====================") - producer.close() } } object KafkaStreamSuite { val random = new Random() - def tmpDir(): File = { + def createTmpDir(): File = { val tmp = System.getProperty("java.io.tmpdir") - val f = new File(tmp, "spark-kafka-" + random.nextInt(1000)) + val f = new File(tmp, "spark-kafka-" + random.nextInt(10000)) f.mkdirs() f } @@ -166,12 +181,33 @@ object KafkaStreamSuite { file.delete() } } + + def waitUntilTrue(condition: () => Boolean, waitTime: Long): Boolean = { + val startTime = System.currentTimeMillis() + while (true) { + if (condition()) + return true + if (System.currentTimeMillis() > startTime + waitTime) + return false + Thread.sleep(waitTime.min(100L)) + } + // Should never go to here + throw new RuntimeException("unexpected error") + } + + def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int, + timeout: Long) { + assert(waitUntilTrue(() => + servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains( + TopicAndPartition(topic, partition))), timeout), + s"Partition [$topic, $partition] metadata not propagated after timeout") + } } class EmbeddedZookeeper(val zkConnect: String) { val random = new Random() - val snapshotDir = KafkaStreamSuite.tmpDir() - val logDir = KafkaStreamSuite.tmpDir() + val snapshotDir = KafkaStreamSuite.createTmpDir() + val logDir = KafkaStreamSuite.createTmpDir() val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500) val(ip, port) = { From 5222330879f7310e424a334a568b7fbd89790e9e Mon Sep 17 00:00:00 2001 From: jerryshao Date: Sun, 3 Aug 2014 16:26:32 +0800 Subject: [PATCH 5/6] Change JavaKafkaStreamSuite to better test it --- .../streaming/kafka/JavaKafkaStreamSuite.java | 34 ++++++++++++------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index c67d63f5ff00a..3f2e9b2b19e72 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -27,8 +27,11 @@ import junit.framework.Assert; +import kafka.serializer.StringDecoder; + import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.Function; +import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.LocalJavaStreamingContext; import org.apache.spark.streaming.api.java.JavaDStream; @@ -65,17 +68,31 @@ public void testKafkaStream() throws InterruptedException { String topic = "topic1"; HashMap topics = new HashMap(); topics.put(topic, 1); - testSuite.createTopic(topic); HashMap sent = new HashMap(); sent.put("a", 5); sent.put("b", 3); sent.put("c", 10); + testSuite.createTopic(topic); + HashMap tmp = new HashMap(sent); + testSuite.produceAndSendMessage(topic, + JavaConverters.asScalaMapConverter(tmp).asScala().toMap( + Predef.>conforms())); + + HashMap kafkaParams = new HashMap(); + kafkaParams.put("zookeeper.connect", testSuite.zkConnect()); + kafkaParams.put("group.id", "test-consumer-" + testSuite.random().nextInt(10000)); + kafkaParams.put("auto.offset.reset", "smallest"); + JavaPairDStream stream = KafkaUtils.createStream(ssc, - testSuite.zkConnect(), - "group", - topics); + String.class, + String.class, + StringDecoder.class, + StringDecoder.class, + kafkaParams, + topics, + StorageLevel.MEMORY_ONLY_SER()); final HashMap result = new HashMap(); @@ -107,15 +124,6 @@ public Void call(JavaPairRDD rdd) throws Exception { ); ssc.start(); - - // Sleep to let Receiver start first - Thread.sleep(3000); - - HashMap tmp = new HashMap(sent); - testSuite.produceAndSendMessage(topic, - JavaConverters.asScalaMapConverter(tmp).asScala().toMap( - Predef.>conforms())); - ssc.awaitTermination(3000); Assert.assertEquals(sent.size(), result.size()); From b6a505f0dac732f4ced2aa319e0d83a838d79b87 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Tue, 5 Aug 2014 10:19:46 +0800 Subject: [PATCH 6/6] code refactor according to comments --- .../streaming/kafka/JavaKafkaStreamSuite.java | 4 +- .../streaming/kafka/KafkaStreamSuite.scala | 93 ++++++++----------- 2 files changed, 40 insertions(+), 57 deletions(-) diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index 3f2e9b2b19e72..0571454c01dae 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -77,12 +77,12 @@ public void testKafkaStream() throws InterruptedException { testSuite.createTopic(topic); HashMap tmp = new HashMap(sent); testSuite.produceAndSendMessage(topic, - JavaConverters.asScalaMapConverter(tmp).asScala().toMap( + JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap( Predef.>conforms())); HashMap kafkaParams = new HashMap(); kafkaParams.put("zookeeper.connect", testSuite.zkConnect()); - kafkaParams.put("group.id", "test-consumer-" + testSuite.random().nextInt(10000)); + kafkaParams.put("group.id", "test-consumer-" + KafkaTestUtils.random().nextInt(10000)); kafkaParams.put("auto.offset.reset", "smallest"); JavaPairDStream stream = KafkaUtils.createStream(ssc, diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index a697230d62582..c0b55e9340253 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -37,16 +37,17 @@ import org.apache.zookeeper.server.NIOServerCnxnFactory import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.Utils class KafkaStreamSuite extends TestSuiteBase { - import KafkaStreamSuite._ + import KafkaTestUtils._ val zkConnect = "localhost:2181" val zkConnectionTimeout = 6000 val zkSessionTimeout = 6000 val brokerPort = 9092 - val brokerProps = getBrokerConfig(brokerPort) + val brokerProps = getBrokerConfig(brokerPort, zkConnect) val brokerConf = new KafkaConfig(brokerProps) protected var zookeeper: EmbeddedZookeeper = _ @@ -76,7 +77,7 @@ class KafkaStreamSuite extends TestSuiteBase { override def afterFunction() { producer.close() server.shutdown() - brokerConf.logDirs.foreach { f => deleteDir(new File(f)) } + brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) } zkClient.close() zookeeper.shutdown() @@ -119,25 +120,6 @@ class KafkaStreamSuite extends TestSuiteBase { ssc.stop() } - private def getBrokerConfig(port: Int): Properties = { - val props = new Properties() - props.put("broker.id", "0") - props.put("host.name", "localhost") - props.put("port", port.toString) - props.put("log.dir", createTmpDir().getAbsolutePath) - props.put("zookeeper.connect", zkConnect) - props.put("log.flush.interval.messages", "1") - props.put("replica.socket.timeout.ms", "1500") - props - } - - private def getProducerConfig(brokerList: String): Properties = { - val props = new Properties() - props.put("metadata.broker.list", brokerList) - props.put("serializer.class", classOf[StringEncoder].getName) - props - } - private def createTestMessage(topic: String, sent: Map[String, Int]) : Seq[KeyedMessage[String, String]] = { val messages = for ((s, freq) <- sent; i <- 0 until freq) yield { @@ -161,25 +143,26 @@ class KafkaStreamSuite extends TestSuiteBase { } } -object KafkaStreamSuite { +object KafkaTestUtils { val random = new Random() - def createTmpDir(): File = { - val tmp = System.getProperty("java.io.tmpdir") - val f = new File(tmp, "spark-kafka-" + random.nextInt(10000)) - f.mkdirs() - f + def getBrokerConfig(port: Int, zkConnect: String): Properties = { + val props = new Properties() + props.put("broker.id", "0") + props.put("host.name", "localhost") + props.put("port", port.toString) + props.put("log.dir", Utils.createTempDir().getAbsolutePath) + props.put("zookeeper.connect", zkConnect) + props.put("log.flush.interval.messages", "1") + props.put("replica.socket.timeout.ms", "1500") + props } - def deleteDir(file: File) { - if (file.isFile) { - file.delete() - } else { - for (f <- file.listFiles()) { - deleteDir(f) - } - file.delete() - } + def getProducerConfig(brokerList: String): Properties = { + val props = new Properties() + props.put("metadata.broker.list", brokerList) + props.put("serializer.class", classOf[StringEncoder].getName) + props } def waitUntilTrue(condition: () => Boolean, waitTime: Long): Boolean = { @@ -202,25 +185,25 @@ object KafkaStreamSuite { TopicAndPartition(topic, partition))), timeout), s"Partition [$topic, $partition] metadata not propagated after timeout") } -} -class EmbeddedZookeeper(val zkConnect: String) { - val random = new Random() - val snapshotDir = KafkaStreamSuite.createTmpDir() - val logDir = KafkaStreamSuite.createTmpDir() + class EmbeddedZookeeper(val zkConnect: String) { + val random = new Random() + val snapshotDir = Utils.createTempDir() + val logDir = Utils.createTempDir() - val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500) - val(ip, port) = { - val splits = zkConnect.split(":") - (splits(0), splits(1).toInt) - } - val factory = new NIOServerCnxnFactory() - factory.configure(new InetSocketAddress(ip, port), 16) - factory.startup(zookeeper) - - def shutdown() { - factory.shutdown() - KafkaStreamSuite.deleteDir(snapshotDir) - KafkaStreamSuite.deleteDir(logDir) + val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500) + val (ip, port) = { + val splits = zkConnect.split(":") + (splits(0), splits(1).toInt) + } + val factory = new NIOServerCnxnFactory() + factory.configure(new InetSocketAddress(ip, port), 16) + factory.startup(zookeeper) + + def shutdown() { + factory.shutdown() + Utils.deleteRecursively(snapshotDir) + Utils.deleteRecursively(logDir) + } } }