-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: don't create threads per request #6665
Conversation
c8195ed
to
8d2ed9c
Compare
pullExecutorService.awaitTermination( | ||
Duration.ofSeconds(10).toMillis(), TimeUnit.MILLISECONDS); | ||
} catch (final InterruptedException e) { | ||
Thread.currentThread().interrupt(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If someone has interrupted this thread, you can just skip awaiting termination of the executorService and log the exception. I don't think there's any point in resetting the interrupt flag. That's kind of the reverse of the pattern:
https://docs.oracle.com/javase/tutorial/essential/concurrency/interrupt.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually discussed in good detail here: https://stackoverflow.com/questions/3976344/handling-interruptedexception-in-java
I think if there's a notion that the caller of this method may want to check the status of the flag, then there's a pattern for catch InterruptedException
and reset the flag via Thread.currentThread().interrupt();
as you're doing. If there were other things further along that might block and throw InterruptedException
, it might make sense to do what you're doing to effectively pass on the flag so the next thing doesn't block either. I'll leave it to you to assess that. You should probably still log the exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's closed in the Autoclosable of HARouting now, basically how Andy had fixed it
@@ -154,6 +155,7 @@ PullQueryResult executePullQuery( | |||
ConfiguredStatement<Query> statement, | |||
RoutingFilterFactory routingFilterFactory, | |||
RoutingOptions routingOptions, | |||
ExecutorService executorService, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we avoid passing down a general-purpose ExecutorService
? It seems very easy to "abuse" if down the line someone needs a thread pool they'll think "oh look, I can just use this one that's already here!" and "steal" threads from the pull queries.
I'm about to hop on a meeting but I'll take a deeper look in a bit. I think it might be worth refactoring HARouting
so that we can pass one instance down (or create on when we create the engine).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree and I thought about it as well. The current problem is that the HARouting takes as argument the pull physical plan so that it can execute it when the query is served locally. I haven't come up with an idea on how to refactor this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can pass the PullPhysicalPlan
as an argument of HARouting.handlePullQuery
rather than injecting into the constructor. I think you would have to do the same with ConfiguredStatement<Query> statement
, LogicalSchema outputSchema
, and QueryId queryId
. That should allow HARouting to be a singleton.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Description
With the pull query physical plan refactoring, a bug was introduced where the executor service was instantiated per pull query request. I fixed that by creating it once in the
KsqlRestApplication
.Testing done
All tests pass
Reviewer checklist