Skip to content

Commit

Permalink
add unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
uncleGen committed Nov 30, 2016
1 parent cd7e595 commit 39b4867
Showing 1 changed file with 47 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,28 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
ssc.stop()
}

test("SPARK-18560 Receiver data should be deserialized properly.") {
// Start a two nodes cluster, so receiver will use one node, and Spark jobs will use the
// other one. Then Spark jobs need to fetch remote blocks and it will trigger SPARK-18560.
val conf = new SparkConf().setMaster("local-cluster[2,1,1024]").setAppName(appName)
ssc = new StreamingContext(conf, Milliseconds(100))
val input = ssc.receiverStream(new FakeByteArrayReceiver)
input.count().foreachRDD { rdd =>
// Make sure we can read from BlockRDD
if (rdd.collect().headOption.getOrElse(0L) > 0) {
// Stop StreamingContext to unblock "awaitTerminationOrTimeout"
new Thread() {
setDaemon(true)
override def run(): Unit = {
ssc.stop(stopSparkContext = true, stopGracefully = false)
}
}.start()
}
}
ssc.start()
ssc.awaitTerminationOrTimeout(60000)
}

def addInputStream(s: StreamingContext): DStream[Int] = {
val input = (1 to 100).map(i => 1 to i)
val inputStream = new TestInputStream(s, input, 1)
Expand Down Expand Up @@ -869,6 +891,31 @@ object TestReceiver {
val counter = new AtomicInteger(1)
}

class FakeByteArrayReceiver extends Receiver[Array[Byte]](StorageLevel.MEMORY_ONLY) with Logging {

val data: Array[Byte] = "test".getBytes
var receivingThreadOption: Option[Thread] = None

override def onStart(): Unit = {
val thread = new Thread() {
override def run() {
logInfo("Receiving started")
while (!isStopped) {
store(data)
}
logInfo("Receiving stopped")
}
}
receivingThreadOption = Some(thread)
thread.start()
}

override def onStop(): Unit = {
// no clean to be done, the receiving thread should stop on it own, so just wait for it.
receivingThreadOption.foreach(_.join())
}
}

/** Custom receiver for testing whether a slow receiver can be shutdown gracefully or not */
class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int)
extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging {
Expand Down

0 comments on commit 39b4867

Please sign in to comment.