Skip to content

Commit

Permalink
[SPARK-18617][SPARK-18560][TEST] Fix flaky test: StreamingContextSuit…
Browse files Browse the repository at this point in the history
…e. Receiver data should be deserialized properly

## What changes were proposed in this pull request?

Fixed the potential SparkContext leak in `StreamingContextSuite.SPARK-18560 Receiver data should be deserialized properly` which was added in apache#16052. I also removed FakeByteArrayReceiver and used TestReceiver directly.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <[email protected]>

Closes apache#16091 from zsxwing/SPARK-18617-follow-up.
  • Loading branch information
zsxwing authored and Robert Kruszewski committed Dec 15, 2016
1 parent cb1a44b commit e1ccf06
Showing 1 changed file with 0 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -900,31 +900,6 @@ object TestReceiver {
val counter = new AtomicInteger(1)
}

class FakeByteArrayReceiver extends Receiver[Array[Byte]](StorageLevel.MEMORY_ONLY) with Logging {

val data: Array[Byte] = "test".getBytes
var receivingThreadOption: Option[Thread] = None

override def onStart(): Unit = {
val thread = new Thread() {
override def run() {
logInfo("Receiving started")
while (!isStopped) {
store(data)
}
logInfo("Receiving stopped")
}
}
receivingThreadOption = Some(thread)
thread.start()
}

override def onStop(): Unit = {
// no clean to be done, the receiving thread should stop on it own, so just wait for it.
receivingThreadOption.foreach(_.join())
}
}

/** Custom receiver for testing whether a slow receiver can be shutdown gracefully or not */
class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int)
extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging {
Expand Down

0 comments on commit e1ccf06

Please sign in to comment.