-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17100: GlobalStreamThread#start should not busy-wait #16914
Conversation
// Question: Is this a good idea? Or should we spread the latch countdown to finally blocks | ||
// Count down the latch if transitioning from CREATED to any other state | ||
if (oldState == State.CREATED && newState != State.CREATED) { | ||
initializationLatch.countDown(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gharris1727 This works (all the tests pass), but I want to double-check it with you. The other alternative would be to count down in finally
blocks across the class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other alternative would be to count down in finally blocks across the class.
It could make sense to have a finally block in the initialize
method and call countDown
in shutdown
, but the current implementation appears to have reasonable semantics.
I have a weak feeling that counting down this latch is too much for the setState method to be doing, but at the same time pushing the latch management out into the calling code risks missing a (possibly future) call-site which transitions away from CREATED.
I think i would leave this to see what the Streams folks think about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @gharris1727 -- this is not the right place for this code. I am not 100% sure right now what the best place would be, but both run()
or initialize()
look like good candidates (maybe initialize()
is better).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved the countDown
logic to the finally
block of initialize
. It sounds reasonable to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems reasonable to me, and captures the intent of my ticket.
@mjsax @ableegoldman Could one of you PTAL, thanks!
// Question: Is this a good idea? Or should we spread the latch countdown to finally blocks | ||
// Count down the latch if transitioning from CREATED to any other state | ||
if (oldState == State.CREATED && newState != State.CREATED) { | ||
initializationLatch.countDown(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other alternative would be to count down in finally blocks across the class.
It could make sense to have a finally block in the initialize
method and call countDown
in shutdown
, but the current implementation appears to have reasonable semantics.
I have a weak feeling that counting down this latch is too much for the setState method to be doing, but at the same time pushing the latch management out into the calling code risks missing a (possibly future) call-site which transitions away from CREATED.
I think i would leave this to see what the Streams folks think about it.
initializationLatch.await(); | ||
} catch (final InterruptedException e) { | ||
currentThread().interrupt(); | ||
throw new IllegalStateException("Thread was interrupted during initialization", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a "new" exception, as the previous code would have just spun calling Utils.sleep and continuously throwing InterruptedExceptions. It's more informative than the other IllegalStateException thrown here, so this seems reasonable.
initializationLatch.await(); | ||
} catch (final InterruptedException e) { | ||
currentThread().interrupt(); | ||
throw new IllegalStateException("Thread was interrupted during initialization", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw new IllegalStateException("Thread was interrupted during initialization", e); | |
throw new IllegalStateException("GlobalStreamThread was interrupted during initialization", e); |
@@ -436,6 +430,8 @@ private StateConsumer initialize() { | |||
} catch (final Exception fatalException) { | |||
closeStateConsumer(stateConsumer, false); | |||
startupException = new StreamsException("Exception caught during initialization of GlobalStreamThread", fatalException); | |||
} finally { | |||
initializationLatch.countDown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think to keep the existing semantics, another one of these is needed in shutdown
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to right now, given that start()
blocks and thus shutdown()
cannot be triggered as long as init()
did not finish (cf https://issues.apache.org/jira/browse/KAFKA-7380)
But maybe good to add to be future proof...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting... 💭 I have added the countdown to the shutdown()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gharris1727 are you happy with the PR? Can we merge it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @raminqaf!
Thanks for the PR @raminqaf -- merged to |
) This PR replaces a busy-wait sleep with a CountDownLatch. Reviewers: Greg Harris <[email protected]>, Matthias J. Sax <[email protected]>
) This PR replaces a busy-wait sleep with a CountDownLatch. Reviewers: Greg Harris <[email protected]>, Matthias J. Sax <[email protected]>
More detailed description
I have replaced the
sleep
method with aCountDownLatch
. It will await until the latch count reaches zero. The countdown happens in theinitialize
method in afinally
block.Summary of testing strategy
All the tests in
GlobalStreamThreadTest
pass, indicating that the class's behavior has remained unchanged.Committer Checklist (excluded from commit message)