Skip to content
This repository has been archived by the owner on May 9, 2024. It is now read-only.

Commit

Permalink
Fix style.
Browse files Browse the repository at this point in the history
  • Loading branch information
dragos committed Jul 28, 2015
1 parent a2eb3b9 commit 5125e60
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
/**
* Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker.
*/
override protected[streaming] val rateController: Option[RateController] =
if (RateController.isBackPressureEnabled(ssc.conf))
override protected[streaming] val rateController: Option[RateController] = {
if (RateController.isBackPressureEnabled(ssc.conf)) {
RateEstimator.create(ssc.conf).map { new ReceiverRateController(id, _) }
else
} else {
None
}
}

/**
* Gets the receiver object that will be sent to the worker nodes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class RateControllerSuite extends TestSuiteBase {
}
}

test("receiver rate controller updates reach receivers") {
test("publish rates reach receivers") {
val ssc = new StreamingContext(conf, batchDuration)
withStreamingContext(ssc) { ssc =>
val dstream = new RateLimitInputDStream(ssc) {
Expand All @@ -65,7 +65,7 @@ class RateControllerSuite extends TestSuiteBase {
}
}

test("multiple rate controller updates reach receivers") {
test("multiple publish rates reach receivers") {
val ssc = new StreamingContext(conf, batchDuration)
withStreamingContext(ssc) { ssc =>
val rates = Seq(100L, 200L, 300L)
Expand Down

0 comments on commit 5125e60

Please sign in to comment.