-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-21960][Streaming] Spark Streaming Dynamic Allocation should respect spark.executor.instances #19183
Conversation
…spect spark.executor.instances
This is actually a good design solution. Right now, it is not very clear (not even docs, examples, google searches) how to set an initial number of executors for a Streaming Application that haves StreamingDynamicAllocation enabled. I considerer this important, cause for some streams, like Kinesis... a minimun number of executors are needed, to match shards. Any quick workaround for this? |
Just commenting to be subscribed! |
Jenkins, ok to test. |
@karth295 for validating that spark.executors is in a valid range could we look at it in the config with ConfigBuilder checkValue ? |
Also one point, the lack of tests leaves me a little concerned about this change, maybe look at This is a bit out of my usual range so I'm going to CC @koeninger to take a more detailed look. |
Test build #90232 has finished for PR 19183 at commit
|
I don't have personal experience with streaming dynamic allocation, but this patch makes sense to me and I don't see anything obviously wrong. I agree with Holden regarding tests. |
I can add a test on ExecutorAllocationManagerSuite.scala to assert spark.executor.instances correctly allocates that number of executors initially when uaing Streaming DA, while still respecting streaming min and max executors. Can't think of another test, TBH. What do you think? |
@sansagara sounds reasonable to me |
@sansagara go for it -- it'll be a few days until I'll have time to look at this again. I'll close my PR if/when you make a new one :) |
Can one of the admins verify this patch? |
I thought this check also existed in the non-streaming code; the theory was that if you have set a fixed number of executors but enabled dynamic allocation, then that's probably a configuration error. But given that many people run on clusters with dynamic allocation defaulting to 'on' globally, that could be confusing or a little inconvenient to work around. I don't think that check exists in the non-streaming code anymore though, and I see a test to that effect too. Therefore I think this is reasonable for consistency. CC @tdas |
@srowen ah, thanks for the background -- that does make sense. @skonto I agree that the message should be logged when the SparkContext gets initialized, but I don't like the idea of putting it in validateSettings. Spark Core's ExecuterAllocationManager does its own validation on construction, and it is constructed while the SparkContext is getting initialized. I like that each module validates its own configs, rather than having validation separate from code. Spark Streaming's ExecutorAllocationManager also validates on construction, but it gets created when a job starts, not when the StreamingContext is initialized. This is because it depends on two things that only are available when the job starts: ReceiverTracker and the micro-batch interval. I see two options:
I'm inclined to implement option 1 to avoid parsing the properties in two places -- constructor and validateSettings. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm OK with the change as-is, but OK with other solutions if you all think there's a better way.
What changes were proposed in this pull request?
Removes check that
spark.executor.instances
is set to 0 when using Streaming DRA.How was this patch tested?
Manual tests
My only concern with this PR is that
spark.executor.instances
(or the actual initial number of executors that the cluster manager gives Spark) can be outside ofspark.streaming.dynamicAllocation.minExecutors
tospark.streaming.dynamicAllocation.maxExecutors
. I don't see a good way around that, because this code only runs after the SparkContext has been created.