-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support parallel combine in brokers #6629
Conversation
@drcrallen would you please review this? |
@@ -37,6 +37,8 @@ | |||
public static final String DEFAULT_TIMEOUT_KEY = "defaultTimeout"; | |||
@Deprecated | |||
public static final String CHUNK_PERIOD_KEY = "chunkPeriod"; | |||
public static final String NUM_BROKER_PARALLEL_COMBINE_THREADS = "numBrokerParallelCombineThreads"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to not add this configurations and figure out the most appropriate setting automatically?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, what is the most appropriate setting you are thinking? I guess it's possible to automatically pick something reasonable based on some statistics like intermediate aggregate size, # of rows to be combined, etc. This is possible in the future when we have a system to collect such statistics.
It currently picks all available processing threads if this value is set to -1 (https://github.com/apache/incubator-druid/pull/6629/files#diff-d8ff07a1952a0070326e5647b4e738f9R45). Do you have a better idea?
*/ | ||
public static <T> int getNumBrokerParallelCombineThreads(Query<T> query) | ||
{ | ||
return parseInt(query, NUM_BROKER_PARALLEL_COMBINE_THREADS, NO_PARALLEL_COMBINE_THREADS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From this line it seemed to me that the default is not parallelizing by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's not enabled by default because I'm not sure how this affects to the overall performance of the cluster if it's enabled for all queries. Also, enabling parallel combine might not be good if the size of intermediate aggregates is not large enough which is the common Druid use case. I added this to doc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to manage processing resources so that there is confidence that even if parallel combine is enabled for all queries, it doesn't make things much worse (but parallelization overhead is acceptable, e. g. within 10% of regression in the most unfavourable cases)?
Is it possible to estimate the size of intermediate aggregates and enable parallel combine automatically when it begins to make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to manage processing resources so that there is confidence that even if parallel combine is enabled for all queries, it doesn't make things much worse (but parallelization overhead is acceptable, e. g. within 10% of regression in the most unfavourable cases)?
Here is how it currently works.
- Brokers use the blockingPool to manage processing threads.
- If
NUM_BROKER_PARALLEL_COMBINE_THREADS
= -1 for a query, the broker reserves all current available resources to the query. Once the query computes the actual required number of thread pools among the available ones, it immediately releases them (https://github.com/apache/incubator-druid/pull/6629/files#diff-dc9ea12c892e41bbd957c77b43810ddeR103). If there are not enough threads available, the query is simply run without parallel combine. - if
NUM_BROKER_PARALLEL_COMBINE_THREADS
is a positive number n, the broker reserves n threads. If there's not enough threads, the query waits for n threads to be available.
So, for your question, if NUM_BROKER_PARALLEL_COMBINE_THREADS
is set to -1 for all queries, all queries can be run at least without waiting for threads which I think is the bottom line.
Is it possible to estimate the size of intermediate aggregates and enable parallel combine automatically when it begins to make sense?
Well, we can do some rough optimization based on the estimated input size and the cardinality of grouping keys, but not sure how good it will be.
In the future, I think it's possible to do advanced optimizations for queries which need blocking aggregation in historicals, like groupBy. Since, in blocking aggregation, historicals are able to know the exact size and number of intermediate aggregates they will send to the broker, they can send those metadata before sending the actual aggregates, so that the broker can make some decisions about run-time optimization like parallel merge. Does this make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does ForkJoinPool / parallel Streams facility approach this problem? Is it applicable to parallel combine in brokers?
Well, we can do some rough optimization based on the estimated input size and the cardinality of grouping keys, but not sure how good it will be.
I think the advantage that we have and parallel streams don't is that we can safely assume that all pieces of data to be merged take approximately the same time. So we could extrapolate, based on processing of some small part of data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@leventov I'm still not sure what your idea is to achieve. Currently, a broker can combine multiple queries at the same time without blocking each other, although the number of concurrent queries is limited by the number of its http threads. I'm not sure how your idea is better than now.
Do you just want to avoid the manual setting for NUM_BROKER_PARALLEL_COMBINE_THREADS
? My intention was we need a way to explicitly specify the number of threads for combine because automatic optimization might be wrong or not enough. (This is especially true in this PR because the only available option for auto optimization is to set NUM_BROKER_PARALLEL_COMBINE_THREADS
to -1.) However, if they really want to use some particular number of threads, there should be enough available threads at the query time. Otherwise, the query will be blocked and failed in the end.
What do you think about this? And do you think it makes sense to add a better optimization mechanism later?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's essentially impossible to set NUM_BROKER_PARALLEL_COMBINE_THREADS
"right" even if it is done on query per query basis, that will nobody ever do. This moment broker could be idle, the next moment a lot of queries will arrive and setting NUM_BROKER_PARALLEL_COMBINE_THREADS
will only make things worse.
because automatic optimization might be wrong or not enough
My intention to employ techniques from ForkJoinPool. When you use parallelStream()
, it basically could not parallelize wrong or not enough. It may not be 100% optimal because of coarse partitioning granularity (You could only split any piece of work in two parts, not three, for example), but it is always within 10-20% from optimum. And this actually should not be so for combine on brokers.
The only way to really abuse parallelStream()
is to pass it something that could be completed in less than 10ms. But it could be safely avoided in parallel combine too, as I shown above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@leventov sorry, maybe I wasn't clear. What I don't understand is how your idea makes the combine faster than at least the current implementation in master.
I talked about this with @jon-wei and maybe the following is what you're thinking.
You may think of the below architecture for parallel combine. H{n}
and F{n}
represent a HTTP thread and a Fork Join Task, respectively. S{n}
is a sequence.
H1 H2 H3 H4 H5 (HTTP thread pool)
\ \ | / /
+----------+
| F1 F2 F3 | (Fork Join Pool)
+----------+
/ / / \ \ \
S1 S2 S3 S4 S5 S6
The fork join tasks can combine any sequences for any queries, but the sequences to combine together should be for the same query. Also, each fork join task computes for a limited time like 10ms as you said. Once they complete their task, they return the result to the proper HTTP thread. The HTTP threads combine the results from fork join tasks again to get the final result. For a query, since the combine is hierarchically computed in an HTTP thread and fork join tasks, it can be done in parallel and be faster than the current implementation in master. If this is what you think, I think it's a good idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't understand what do you mean, but we could do the initial "sampling" merge of just two sequences, and then, based on the timing and the number of idle threads in FJP (getRunningThreadCount()
, getActiveThreadCount()
, etc.) decide on the degree of merge, similarly to what you already do in ParallelMergeCombineSequence
. So it's the same hierarchical merge, just done in FJP with "yields" every 10ms, so that small queries that arrived in parallel with the humongous query have a chance to intercept and don't have to wait long until that big query combine is completed, because all processing threads are busy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@leventov got it. I think now I understand what you mean though it's still not clear how to reserve the threads needed for hierarchical merge with forkJoinPool. (If hierarchical merge is done as in this PR, it needs sort of a gang scheduling. Once we decide to use some threads for a query, they should be available at the same time.)
Anyway, I like this idea and will change my implementation. Probably will close this and raise another PR. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got through a chunk of it but ran out of time to continue. My main question in the implementation is as follows: There is already a large body of effort and thought in the java.lang.Thread*
and java.util.concurrent
packages that seem to be purposefully avoided here. I haven't been able to fully dig into ParallelMergeCombineSequence
yet, but it seems like a lot of the java concepts should be able to be used here. I'll try to give some examples when I can
int available(); | ||
|
||
/** | ||
* Poll all available resources from the pool. If there's no available resource, it returns an empty list. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The BlockingQueue
interface has a drainTo
method that does something similar. Can you add documentation on how the behavior of this method is similar or different compared to drainTo
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May I ask why you think it should be documented? BlockingPool
is our own API and it doesn't use BlockingQueue
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm trying to think as a new developer. What would reduce the total spinup time to get effective contributions. There's a lot of methods which are kind of like core java methods, but are a little different, and it gets hard to differentiate what is important and what is not.
IMHO "This is a subset of BlockingQueue
with some other added features, but we don't want to depend on keeping up with BlockingQueue
upstream changes" is an easier story than "We made up our own interfaces that behave different than other things you've seen, and you have to learn about how the new interfaces are and are not enforced"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is a subset of BlockingQueue. It's only similar in terms of "the blocking operation", but it doesn't mean they're similar. The queue and the pool are different. The key characteristic of the queue is that it's FIFO, but there's no order for inserting/getting items to/from the pool.
I don't think anyone would guess BlockingPool's behavior by comparing it with BlockingQueue. If something is not clear, it just means we need to add more detailed doc.
{ | ||
return objects.size(); | ||
final ReentrantLock lock = this.lock; | ||
lock.lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why bother locking if the size returned is immediately going to be invalid? aka, some other thread may have already changed the size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, that's a good point. I changed this to VisibleForTesting
and takeBatch()
to return the number of remaining resources together. Thanks!
@@ -99,6 +99,11 @@ public static DateTime of(String instant) | |||
return new DateTime(instant, ISOChronology.getInstanceUTC()); | |||
} | |||
|
|||
public static DateTime of(String format, Object... formatArgs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems a little unnatural to me IMHO. DateTime.of
that actually does string formatting is a bit surprising. I propose changing the method name to something more descriptive to what the method is doing, or just use StringUtils.format
in the callers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, would you elaborate more on why it's unnatural?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems easier to add a factory method that accepts all parts from year to second, as in http://joda-time.sourceforge.net/apidocs/org/joda/time/DateTime.html#DateTime(int,%20int,%20int,%20int,%20int,%20int).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can add it, but I'm not sure why it's better. We have a similar method for Interval.
public static Interval of(String format, Object... formatArgs)
{
return of(StringUtils.format(format, formatArgs));
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That other method is added recently. It used at nine places in tests. Six of them are moot, because parameters are constants and instead Intervals.of(String)
could be used. At the other three places, I would say, it would be better to use new Interval(Datetimes.of(...), Datetimes.of(...))
, where the Datetimes.of()
is what is suggested above.
Minor, but real disadvantages of the vararg methods is that they are not reflected in the corresponding IntelliJ inspection (see "MalformedFormatString" in inspectionProfiles/Druid.xml) and extra performance cost. While they could be replaced trivially.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @leventov on these points
if (numParallelCombineThreads > 0) { | ||
final ReserveResult reserveResult = processingThreadResourcePool.reserve(query, numParallelCombineThreads); | ||
if (!reserveResult.isOk()) { | ||
throw new ISE( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this really need to hard fail? can it just warn and fall back to the default behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea behind this is, druid currently can't make the optimal decision for resource planning by itself. (The optimal automatic resource allocation I think is at least that Druid should be able to allocate proper resources based on the query priority and how heavy its workload is. So, users should be careful when allocating resources. If there isn't enough resources required for this particular query, it means user's resource planning failed and the query should be failed.
} else if (numParallelCombineThreads == QueryContexts.NO_PARALLEL_COMBINE_THREADS) { | ||
return sequentialMerge(sequencesByInterval); | ||
} else { | ||
throw new ISE( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest making the check simply
if (numParallelCombineThreads == QueryContexts.NO_PARALLEL_COMBINE_THREADS) {
return sequentialMerge(sequencesByInterval);
}
final ReserveResult reserveResult = processingThreadResourcePool.reserve(query, numParallelCombineThreads);
if (reserveResult.isOk()) {
return parallelMerge(sequencesByInterval, reserveResult.getResources());
} else {
return sequentialMerge(sequencesByInterval);
}
and have processingThreadResourcePool.reserve
handle the positive or negative cases (negative meaning all available, positive meaning no more than that number)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is a good change. numParallelCombineThreads
option is used for only CachingClusteredClient
, so it should be responsible for checking the given option is valid or not.
@@ -40,9 +40,10 @@ public BaseSequence( | |||
public <OutType> OutType accumulate(OutType initValue, final Accumulator<OutType, T> fn) | |||
{ | |||
IterType iterator = maker.make(); | |||
OutType accumulated = initValue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
much cleaner!
*/ | ||
public class ParallelMergeCombineSequence<T> extends YieldingSequenceBase<T> | ||
{ | ||
private static final int MINIMUM_LEAF_COMBINE_DEGREE = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is possible this is too aggressive. For example, merging 16 in series vs merging 8x2, 4x2, 2x2 and a final may not be worth it compared to simply merging 16. In my own tests I found simply splitting to serial groups of 100, and doing a serial merge on the groups, worked pretty darn well. That's only two levels of merging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just a start point to find the proper degree for leaf nodes. If numBrokerParallelCombineThreads
= 1, then a single thread will combine all input sequences.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the main CPU cost lays in the combine function, does it really matter? In the end, we do the same number of calls of this function, if I understand it right.
If the cost of infrastructure does matter, probably big wins will be
- Move off blocking, implement async value interchange between levels. Just in this particular small part of the system. RxJava may already have necessary utilities.
- Optimize MergeSequence, don't remove and add elements from/to head, just move element up/down in the heap. This technique is already used in
IntIteratorUtils.MergeIntIterator
andMergingRowIterator
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the main CPU cost lays in the combine function, does it really matter? In the end, we do the same number of calls of this function, if I understand it right.
Good point. I think you're right. I'll test it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My point is, if you fork too much, there are scenarios where maintenance of the forks exceeds the work in the forks themselves. It isn't a problem until its a problem though, and if the macro benchmarks show overall improvement I'm good with real-deployment profiling to see if the overhead is really there before trying to make any changes or tunings to maximum combinations in a single pass.
core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombineSequence.java
Show resolved
Hide resolved
return nextVal != null; | ||
} | ||
catch (InterruptedException e) { | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thread.currentThread().interrupt()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you tell me why? Is there anything else which should handle InterruptedException
particularly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Everywhere that catches Exception that could be an interrupted exception, or anywhere that catches InterruptedException will swallow the interrupted flag on the thread. You can search the code for usages of java.lang.Thread#interrupt
for other places it is done. It is a nasty latent bug source since it can easily result in an interrupt being lost and a retry loop clogging up a shutdown or recovery cycle of some kind. There's a reason the scala includes InterruptedException in its fatal list of exceptions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know. My question is what's the point of interrupting itself again? This exception is just thrown by the threads who run the runnable in background.
private T nextVal; | ||
|
||
@Override | ||
public boolean hasNext() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be true as long as next()
is called up to sequenceList.size()
times right? do you need to block during the hasNext
call vs in the next
call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, all values from sequences of sequenceList
are combined and stored in queue
. hasNext()
should return true until all values are aggregated and this queue becomes empty.
We can't block in next()
since we can't know how many values will be in queue
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried looking through the code base, and other code bases, for some direction on if blocking on hasNext
is good or bad. I didn't find strong evidence either way. Luckily, iterators commonly have a loop like
while (it.hasNext())
{
Object next = it.next();
}
But from a implementation compliant standpoint, the following code yield the same result:
while (it.hasNext() && it.hasNext() && it.hasNext() && it.hasNext() && it.hasNext())
{
Object next = it.next();
}
Which I think is violated by this implementation
@drcrallen thank you for taking a look. I'll check your comments soon. For this question, if you talk about ForkJoinPool, I don't think it's appropriate for this kind of use case. The algorithm implemented in this PR is a sort of pipelining combining sorted data. Each thread combines data from its children, and the parent and its children threads can be run at the same time. I took this approach because the final results should be streamed to the query client. However, I think ForkJoinPool is appropriate for divide-and-conquer style algorithms. For example, it's useful when summing a large sequence of integers. It can automatically split the sequence into several small chunks, and sum them in parallel. The results of summing small chunks should be aggregated again. Here, summing small chunks and aggregating their results can't be done at the same time. What do you think? |
pQueue, | ||
PriorityQueue<Yielder<T>> pQueue = baseSequences.accumulate( | ||
new PriorityQueue<>( | ||
32, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If anyone finds this comment through code archeology, this constant was me just poking around and trying stuff. It is not thoroughly tested :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Good to know.
@drcrallen thanks for the review, but I'll change my implementation based on the discussion with @leventov. (See #6629 (comment).) Probably I'll close this and raise another PR. |
I'm closing this PR. Will raise a new one later. |
This PR is to solve the same problem of #5913, but uses a different approach.
ParallelMergeCombineSequence
is newly added which merges and combines underlying sequences in parallel. As in #4704, this sequence internally creates a processing tree consisting of processing threads connected by blocking queues. The leaf threads combines streams from historicals and stores computed aggregates in its blocking queue. Other threads read values from the blocking queue of their children and stores computed aggregates in its blocking queue. Finally, the HTTP thread reads the final aggregates from the blocking queue of the root thread.I think this is a better approach than that used in #5913 because
Here is the benchmark result. Please note that this benchmark is to test only the performance of broker merge part, so the actual impact to the total query time would be smaller than the benchmark result.
Other changes are:
CachingClusteredClientBenchmark
which can reproduce the above benchmark result.OrderedMergeSequence
ParallelCombiner
toParallelCombines
.createMergeFn()
fromResultMergeQueryRunner
toQueryToolChest
.QueryableDruidServer
to an interface and addedRemoteDruidServer
to write the benchmark.This adds some new query contexts, and so is labelled as
Design Review
.