From 0a811210f809eb5b80eae14694d484d45b48b3f6 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 30 Nov 2016 17:41:43 -0800 Subject: [PATCH] [SPARK-18617][SPARK-18560][TEST] Fix flaky test: StreamingContextSuite. Receiver data should be deserialized properly ## What changes were proposed in this pull request? Fixed the potential SparkContext leak in `StreamingContextSuite.SPARK-18560 Receiver data should be deserialized properly` which was added in #16052. I also removed FakeByteArrayReceiver and used TestReceiver directly. ## How was this patch tested? Jenkins Author: Shixiong Zhu Closes #16091 from zsxwing/SPARK-18617-follow-up. --- .../streaming/StreamingContextSuite.scala | 34 +++++-------------- 1 file changed, 8 insertions(+), 26 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 45d8f50853431..35eeb9dfa5ef8 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.streaming import java.io.{File, NotSerializableException} +import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.ArrayBuffer @@ -811,7 +812,8 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo // other one. Then Spark jobs need to fetch remote blocks and it will trigger SPARK-18560. val conf = new SparkConf().setMaster("local-cluster[2,1,1024]").setAppName(appName) ssc = new StreamingContext(conf, Milliseconds(100)) - val input = ssc.receiverStream(new FakeByteArrayReceiver) + val input = ssc.receiverStream(new TestReceiver) + val latch = new CountDownLatch(1) input.count().foreachRDD { rdd => // Make sure we can read from BlockRDD if (rdd.collect().headOption.getOrElse(0L) > 0) { @@ -820,12 +822,17 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo setDaemon(true) override def run(): Unit = { ssc.stop(stopSparkContext = true, stopGracefully = false) + latch.countDown() } }.start() } } ssc.start() ssc.awaitTerminationOrTimeout(60000) + // Wait until `ssc.top` returns. Otherwise, we may finish this test too fast and leak an active + // SparkContext. Note: the stop codes in `after` will just do nothing if `ssc.stop` in this test + // is running. + assert(latch.await(60, TimeUnit.SECONDS)) } def addInputStream(s: StreamingContext): DStream[Int] = { @@ -891,31 +898,6 @@ object TestReceiver { val counter = new AtomicInteger(1) } -class FakeByteArrayReceiver extends Receiver[Array[Byte]](StorageLevel.MEMORY_ONLY) with Logging { - - val data: Array[Byte] = "test".getBytes - var receivingThreadOption: Option[Thread] = None - - override def onStart(): Unit = { - val thread = new Thread() { - override def run() { - logInfo("Receiving started") - while (!isStopped) { - store(data) - } - logInfo("Receiving stopped") - } - } - receivingThreadOption = Some(thread) - thread.start() - } - - override def onStop(): Unit = { - // no clean to be done, the receiving thread should stop on it own, so just wait for it. - receivingThreadOption.foreach(_.join()) - } -} - /** Custom receiver for testing whether a slow receiver can be shutdown gracefully or not */ class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int) extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging {