Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21960][Streaming] Spark Streaming Dynamic Allocation should respect spark.executor.instances #19183

Closed
wants to merge 1 commit into from

Conversation

karth295
Copy link
Contributor

What changes were proposed in this pull request?

Removes check that spark.executor.instances is set to 0 when using Streaming DRA.

How was this patch tested?

Manual tests

My only concern with this PR is that spark.executor.instances (or the actual initial number of executors that the cluster manager gives Spark) can be outside of spark.streaming.dynamicAllocation.minExecutors to spark.streaming.dynamicAllocation.maxExecutors. I don't see a good way around that, because this code only runs after the SparkContext has been created.

@ghost
Copy link

ghost commented Mar 22, 2018

This is actually a good design solution. Right now, it is not very clear (not even docs, examples, google searches) how to set an initial number of executors for a Streaming Application that haves StreamingDynamicAllocation enabled. I considerer this important, cause for some streams, like Kinesis... a minimun number of executors are needed, to match shards.

Any quick workaround for this?

@sansagara
Copy link

Just commenting to be subscribed!

@holdenk
Copy link
Contributor

holdenk commented May 4, 2018

Jenkins, ok to test.

@holdenk
Copy link
Contributor

holdenk commented May 4, 2018

@karth295 for validating that spark.executors is in a valid range could we look at it in the config with ConfigBuilder checkValue ?

@holdenk
Copy link
Contributor

holdenk commented May 4, 2018

Also one point, the lack of tests leaves me a little concerned about this change, maybe look at ./streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala and see if it would make sense to add something there?

This is a bit out of my usual range so I'm going to CC @koeninger to take a more detailed look.

@SparkQA
Copy link

SparkQA commented May 5, 2018

Test build #90232 has finished for PR 19183 at commit 4c9769e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@koeninger
Copy link
Contributor

I don't have personal experience with streaming dynamic allocation, but this patch makes sense to me and I don't see anything obviously wrong.

I agree with Holden regarding tests.

@sansagara
Copy link

sansagara commented May 10, 2018

I can add a test on ExecutorAllocationManagerSuite.scala to assert spark.executor.instances correctly allocates that number of executors initially when uaing Streaming DA, while still respecting streaming min and max executors. Can't think of another test, TBH. What do you think?

@karth295 karth295 closed this May 10, 2018
@karth295 karth295 reopened this May 10, 2018
@koeninger
Copy link
Contributor

@sansagara sounds reasonable to me

@karth295
Copy link
Contributor Author

@sansagara go for it -- it'll be a few days until I'll have time to look at this again. I'll close my PR if/when you make a new one :)

@skonto
Copy link
Contributor

skonto commented May 28, 2018

@karth295 I think you could validate the config here, in the validateSettings method for SparkConfig. I guess this is also where you have all the info required and its not too late, it is when SparkContext gets initialized. I see other exceptions thrown there.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@srowen
Copy link
Member

srowen commented Jul 18, 2018

I thought this check also existed in the non-streaming code; the theory was that if you have set a fixed number of executors but enabled dynamic allocation, then that's probably a configuration error. But given that many people run on clusters with dynamic allocation defaulting to 'on' globally, that could be confusing or a little inconvenient to work around.

I don't think that check exists in the non-streaming code anymore though, and I see a test to that effect too. Therefore I think this is reasonable for consistency.

CC @tdas

@karth295
Copy link
Contributor Author

@srowen ah, thanks for the background -- that does make sense.

@skonto I agree that the message should be logged when the SparkContext gets initialized, but I don't like the idea of putting it in validateSettings. Spark Core's ExecuterAllocationManager does its own validation on construction, and it is constructed while the SparkContext is getting initialized. I like that each module validates its own configs, rather than having validation separate from code.

Spark Streaming's ExecutorAllocationManager also validates on construction, but it gets created when a job starts, not when the StreamingContext is initialized. This is because it depends on two things that only are available when the job starts: ReceiverTracker and the micro-batch interval.

I see two options:

  1. Initialize ExecutorAllocationManager excluding those two things, and pass them in start()
  2. Make validateSettings a static method and call that when creating the streaming context (or SparkConf), then construct it later.

I'm inclined to implement option 1 to avoid parsing the properties in two places -- constructor and validateSettings.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK with the change as-is, but OK with other solutions if you all think there's a better way.

@asfgit asfgit closed this in ee5a5a0 Jul 27, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants