-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-8975][Streaming] Adds a mechanism to send a new rate from the driver to the block generator #7471
Conversation
…river to the block generator
…aming.receiver.maxRate
[SPARK-8975][Streaming] Adds a mechanism to send a new rate from the driver to the block generator
* @param cond A boolean that should become `true` | ||
* @param timemout How many millis to wait before giving up | ||
*/ | ||
def waitUntil(cond: => Boolean, timeout: Int): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is general enough that probably deserves a better place. I couldn't find anything similar in spark-core Utils or streaming utilities (the closest one was in BatchCounter, but won't do the same thing, and relies on wait/notify). I'm open to suggestions where to move it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have used eventually
for doing this. See
https://github.com/apache/spark/blob/master/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala#L742
No need to add another method here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cool, thanks!
Test build #37634 has finished for PR 7471 at commit
|
1e7b210
to
cd1397d
Compare
Test build #37638 has finished for PR 7471 at commit
|
@@ -271,7 +271,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable | |||
} | |||
|
|||
/** Get the attached executor. */ | |||
private def executor = { | |||
private[streaming] def executor = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change? If you want to access private members, its best to use the PrivateTester
See
https://github.com/apache/spark/blob/master/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala#L866
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't know about PrivateTester
, I'll look into it. Other Spark modules (such as core) are using these access modifiers, so I thought it's common practice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PrivateTester
won't work here. It assumes the private method is declared in the type of the receiver. It only works if the runtime type of the receiver is the type that declares the private method. Not the case here, where the method is declared in Receiver
, but the invocation is on a concrete subclass (DummyReceiver
). Relevant code here.
I could probably code something along those lines, but I think it's better to use qualified private. Are there any downsides to relying on language support for visibility? I doubt any user-code would declare things in o.a.s.streaming
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aah, I see. The reason I wanted to use PrivateTest is because marking private[streaming] make it invisible through Scala compilation, but its visibly publicly in the bytecode, and therefore through Java. And since Receivers can be written in Java, I dont want to expose that. PrivateTester wont work, but the underlying mechanism will working using reflection.
scala> abstract class A { private val i = 1 }
defined class A
scala> class B extends A
defined class B
scala> B.getClass().getSuperclass()
<console>:8: error: not found: value B
B.getClass().getSuperclass()
^
scala> classOf[B].getSuperclass()
res1: Class[_ >: B] = class A
scala> classOf[B].getSuperclass().getDeclaredFields()
res2: Array[java.lang.reflect.Field] = Array(private final int A.i)
scala> val f = classOf[B].getSuperclass().getDeclaredFields().head
f: java.lang.reflect.Field = private final int A.i
scala> f.setAccessible(true)
scala> f.get(new B)
res4: Object = 1
You could use this do the testing. Just make the DummyReceiver have a method that gets the underlying supervisor. Then an object DummyReceiver (no need for TestDummyReceiver class and object) for access to the supervisor.
See my last commit message about what changes I made for testing. |
- made rate limit a Long and default to Long.MaxValue (consequence of the above) - removed custom `waitUntil` and replaced it by `eventually`
7112cfd
to
13ada97
Compare
As I mentioned before, I don’t think this is a great idea: - such tests are flaky (original test in ReceiverSuite was ignored for that reason) - Guava’s code has its own test suite, so we can assume it implements `setRate` correctly I noticed one flaky failure in about 10 runs on my machine (receiver got 1 message less than the lower bound, which is within 5% of the nominal rate).
13ada97
to
0c51959
Compare
Test build #37830 has finished for PR 7471 at commit
|
Test build #37836 has finished for PR 7471 at commit
|
Test build #37835 has finished for PR 7471 at commit
|
@@ -17,6 +17,8 @@ | |||
|
|||
package org.apache.spark.streaming.receiver | |||
|
|||
import java.util.concurrent.atomic.AtomicInteger |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not needed any more. Make sure to remove the empty line below too.
Major things to change for the next iteration.
See other inline threads for smaller nits. |
@@ -180,6 +180,12 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false | |||
logError(s"Deregistered receiver for stream $streamId: $messageWithError") | |||
} | |||
|
|||
/** Update a receiver's maximum ingestion rate */ | |||
def sendRateUpdate(streamUID: Int, newRate: Long): Unit = { | |||
for (info <- receiverInfo.get(streamUID); eP <- Option(info.endpoint)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add {
and }
around the for-loop body
This reverts commit 0c51959.
use the listener bus to know when receivers have registered (`onStart` is called before receivers have registered, leading to flaky behavior).
Test build #37950 has finished for PR 7471 at commit
|
@@ -72,8 +78,62 @@ class ReceiverTrackerSuite extends TestSuiteBase { | |||
assert(locations(0).length === 1) | |||
assert(locations(3).length === 1) | |||
} | |||
|
|||
test("Receiver tracker - propagates rate limit") { | |||
object streamingListener extends StreamingListener { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be capitalized ... say.. TestStreamingListener, or even better ReceiverWaiter.
Test build #38081 has finished for PR 7471 at commit
|
Spurious failure: Building Spark[info] Building Spark (w/Hive 0.13.1) using SBT with these arguments: -Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0 -Pkinesis-asl -Phive-thriftserver -Phive package assembly/assembly streaming-kafka-assembly/assembly streaming-flume-assembly/assembly |
Test build #1157 has finished for PR 7471 at commit
|
Same error 😢 |
Jenkins, test this please |
Test build #1175 has finished for PR 7471 at commit
|
Test build #1177 has finished for PR 7471 at commit
|
Test build #1176 has finished for PR 7471 at commit
|
Test build #1179 has finished for PR 7471 at commit
|
LGTM. Merging this |
…ements the RateController Based on #7471. - [x] add a test that exercises the publish path from driver to receiver - [ ] remove Serializable from `RateController` and `RateEstimator` Author: Iulian Dragos <[email protected]> Author: François Garillot <[email protected]> Closes #7600 from dragos/topic/streaming-bp/rate-controller and squashes the following commits: f168c94 [Iulian Dragos] Latest review round. 5125e60 [Iulian Dragos] Fix style. a2eb3b9 [Iulian Dragos] Merge remote-tracking branch 'upstream/master' into topic/streaming-bp/rate-controller 475e346 [Iulian Dragos] Latest round of reviews. e9fb45e [Iulian Dragos] - Add a test for checkpointing - fixed serialization for RateController.executionContext 715437a [Iulian Dragos] Review comments and added a `reset` call in ReceiverTrackerTest. e57c66b [Iulian Dragos] Added a couple of tests for the full scenario from driver to receivers, with several rate updates. b425d32 [Iulian Dragos] Removed DeveloperAPI, removed rateEstimator field, removed Noop rate estimator, changed logic for initialising rate estimator. 238cfc6 [Iulian Dragos] Merge remote-tracking branch 'upstream/master' into topic/streaming-bp/rate-controller 34a389d [Iulian Dragos] Various style changes and a first test for the rate controller. d32ca36 [François Garillot] [SPARK-8977][Streaming] Defines the RateEstimator interface, and implements the ReceiverRateController 8941cf9 [Iulian Dragos] Renames and other nitpicks. 162d9e5 [Iulian Dragos] Use Reflection for accessing truly private `executor` method and use the listener bus to know when receivers have registered (`onStart` is called before receivers have registered, leading to flaky behavior). 210f495 [Iulian Dragos] Revert "Added a few tests that measure the receiver’s rate." 0c51959 [Iulian Dragos] Added a few tests that measure the receiver’s rate. 261a051 [Iulian Dragos] - removed field to hold the current rate limit in rate limiter - made rate limit a Long and default to Long.MaxValue (consequence of the above) - removed custom `waitUntil` and replaced it by `eventually` cd1397d [Iulian Dragos] Add a test for the propagation of a new rate limit from driver to receivers. 6369b30 [Iulian Dragos] Merge pull request #15 from huitseeker/SPARK-8975 d15de42 [François Garillot] [SPARK-8975][Streaming] Adds Ratelimiter unit tests w.r.t. spark.streaming.receiver.maxRate 4721c7d [François Garillot] [SPARK-8975][Streaming] Add a mechanism to send a new rate from the driver to the block generator
First step for SPARK-7398.
@tdas @huitseeker