-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25692] [CORE] Remove static initialization of worker eventLoop handling chunk fetch requests within TransportContext. This fixes ChunkFetchIntegrationSuite as well #23700
Conversation
@zsxwing @dongjoon-hyun Plz take a look as you have context on this already thanks |
ok to test |
Retest this please. |
The current run is on Currently, #23704 is suffering from |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except one nit.
In addition, I noticed that we don't have any flag for this new feature. This is the first time we add it and it could have some bugs. Could you add it in another PR?
|
@zsxwing you could always disable this by setting spark.shuffle.server.chunkFetchHandlerThreadsPercent to 100. This is turned off by default. To use this feature you have to reduce spark.shuffle.server.chunkFetchHandlerThreadsPercent to like say for example 80 which says ok i have 10 IO threads use only 8 for chunk fetch requests rest for other rpc calls. This is wrt shuffle server |
@redsanket IIUC, all chunks are always handled in a separate event loop and there is no way to change it back to the original single-thread mode. Right? |
right they are handled separately correct but the behavior remains same @zsxwing. We are running our production clusters and haven't seen stability issues yet |
Test build #101899 has finished for PR 23700 at commit
|
Retest this please |
Test build #101912 has finished for PR 23700 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see that there is a behavior change here regarding event loops vs single thread?
But the constructor needs to be updated now; it doesn't need to synchronize on the class to initialize this.
The PR title needs to be more descriptive too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(oops meant to click "Request Changes")
Nice catch will update thanks @srowen |
Retest this please |
conf.chunkFetchHandlerThreads(), | ||
"shuffle-chunk-fetch-handler"); | ||
} | ||
if (chunkFetchWorkers == null && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to check for null; it always is here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes i think this was more of a defensive check when static was around will remove thanks
@@ -88,7 +88,7 @@ | |||
// Separate thread pool for handling ChunkFetchRequest. This helps to enable throttling | |||
// max number of TransportServer worker threads that are blocked on writing response | |||
// of ChunkFetchRequest message back to the client via the underlying channel. | |||
private static EventLoopGroup chunkFetchWorkers; | |||
private EventLoopGroup chunkFetchWorkers; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be final, though will require explicitly setting it to null in the constructor if not otherwise initialized
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes makes sense to make it final thanks will do
Retest this please |
Test build #101952 has finished for PR 23700 at commit
|
Test build #101954 has finished for PR 23700 at commit
|
gentle ping @srowen @zsxwing @tgravescs @dongjoon-hyun thanks |
Removing the static seems fine here. I am curious if you put in an extra sleep(60) at the end of furtherRequestsDelay, that works around it as well? Not advocating that as fix just making sure I understand and nothing else is going on. |
@tgravescs yes that works too... tested that as well but i think removing static is for the best. |
Merged to master |
…handling chunk fetch requests within TransportContext. This fixes ChunkFetchIntegrationSuite as well ## What changes were proposed in this pull request? How to reproduce ./build/mvn test -Dtest=org.apache.spark.network.RequestTimeoutIntegrationSuite,org.apache.spark.network.ChunkFetchIntegrationSuite -DwildcardSuites=None test furtherRequestsDelay Test within RequestTimeoutIntegrationSuite was holding onto buffer references within worker threads. The test does close the server context but since the threads are global and there is sleep of 60 secs to fetch a specific chunk within this test, it grabs on it and waits for the client to consume but however the test is testing for a request timeout and it times out after 10 secs, so the workers are just waiting there for the buffer to be consumed by client as per my understanding. This tends to happen if you dont have enough IO threads available on the specific system and also the order of the tests being run determines its flakyness like if ChunkFetchIntegrationSuite runs first then there is no issue. For example on mac with 8 threads these tests run fine but on my vm with 4 threads it fails. It matches the number of fetch calls in RequestTimeoutIntegrationSuite. So do we really need it to be static? I dont think this requires a global declaration as these threads are only required on the shuffle server end and on the client TransportContext initialization i.e the Client don't initialize these threads. The Shuffle Server initializes one TransportContext object. So, I think this is fine to be an instance variable and I see no harm. ## How was this patch tested? Integration tests, manual tests Closes apache#23700 from redsanket/SPARK-25692. Authored-by: schintap <[email protected]> Signed-off-by: Sean Owen <[email protected]>
…handling chunk fetch requests within TransportContext. This fixes ChunkFetchIntegrationSuite as well How to reproduce ./build/mvn test -Dtest=org.apache.spark.network.RequestTimeoutIntegrationSuite,org.apache.spark.network.ChunkFetchIntegrationSuite -DwildcardSuites=None test furtherRequestsDelay Test within RequestTimeoutIntegrationSuite was holding onto buffer references within worker threads. The test does close the server context but since the threads are global and there is sleep of 60 secs to fetch a specific chunk within this test, it grabs on it and waits for the client to consume but however the test is testing for a request timeout and it times out after 10 secs, so the workers are just waiting there for the buffer to be consumed by client as per my understanding. This tends to happen if you dont have enough IO threads available on the specific system and also the order of the tests being run determines its flakyness like if ChunkFetchIntegrationSuite runs first then there is no issue. For example on mac with 8 threads these tests run fine but on my vm with 4 threads it fails. It matches the number of fetch calls in RequestTimeoutIntegrationSuite. So do we really need it to be static? I dont think this requires a global declaration as these threads are only required on the shuffle server end and on the client TransportContext initialization i.e the Client don't initialize these threads. The Shuffle Server initializes one TransportContext object. So, I think this is fine to be an instance variable and I see no harm. Integration tests, manual tests Closes apache#23700 from redsanket/SPARK-25692. Authored-by: schintap <[email protected]> Signed-off-by: Sean Owen <[email protected]> (cherry picked from commit 13c5634) (cherry picked from commit 0957414d520225ed08dfdcaf383d9c825a969adc) Change-Id: I7bb43d650915e3c99aba79b2f1183a72b4308985 (cherry picked from commit 02a2150957bf3ab167d69c00c3563cffb3ed005e)
What changes were proposed in this pull request?
How to reproduce
./build/mvn test -Dtest=org.apache.spark.network.RequestTimeoutIntegrationSuite,org.apache.spark.network.ChunkFetchIntegrationSuite -DwildcardSuites=None test
furtherRequestsDelay Test within RequestTimeoutIntegrationSuite was holding onto buffer references within worker threads. The test does close the server context but since the threads are global and there is sleep of 60 secs to fetch a specific chunk within this test, it grabs on it and waits for the client to consume but however the test is testing for a request timeout and it times out after 10 secs, so the workers are just waiting there for the buffer to be consumed by client as per my understanding.
This tends to happen if you dont have enough IO threads available on the specific system and also the order of the tests being run determines its flakyness like if ChunkFetchIntegrationSuite runs first then there is no issue. For example on mac with 8 threads these tests run fine but on my vm with 4 threads it fails. It matches the number of fetch calls in RequestTimeoutIntegrationSuite.
So do we really need it to be static?
I dont think this requires a global declaration as these threads are only required on the shuffle server end and on the client TransportContext initialization i.e the Client don't initialize these threads. The Shuffle Server initializes one TransportContext object. So, I think this is fine to be an instance variable and I see no harm.
How was this patch tested?
Integration tests, manual tests