Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support parallel combine in brokers #6629

Closed
wants to merge 6 commits into from

Conversation

jihoonson
Copy link
Contributor

This PR is to solve the same problem of #5913, but uses a different approach. ParallelMergeCombineSequence is newly added which merges and combines underlying sequences in parallel. As in #4704, this sequence internally creates a processing tree consisting of processing threads connected by blocking queues. The leaf threads combines streams from historicals and stores computed aggregates in its blocking queue. Other threads read values from the blocking queue of their children and stores computed aggregates in its blocking queue. Finally, the HTTP thread reads the final aggregates from the blocking queue of the root thread.

I think this is a better approach than that used in #5913 because

  • All query types can benefit from the same code path without adding any new aggregation interface to QueryRunner or QueryToolChest. Currently Timeseries, TopN, and GroupBy queries are supported.
  • Since it uses the existing processing thread pool, it doesn't introduce new configurations for processing pools.
  • The size of blocking queue to materialize intermediate aggregates can be controlled by a user configuration.

Here is the benchmark result. Please note that this benchmark is to test only the performance of broker merge part, so the actual impact to the total query time would be smaller than the benchmark result.

Benchmark                                        (brokerParallelMergeQueueSize)  (numProcessingThreads)  (numServers)  (queryGranularity)  (rowsPerSegment)  Mode  Cnt        Score       Error  Units
CachingClusteredClientBenchmark.groupByQuery                               5120                       4             8                 all             75000  avgt   30   714863.287 ± 58633.480  us/op
CachingClusteredClientBenchmark.groupByQuery                               5120                       2             8                 all             75000  avgt   30  1167741.128 ± 19310.619  us/op
CachingClusteredClientBenchmark.groupByQuery                               5120                       1             8                 all             75000  avgt   30  1241544.213 ± 48946.858  us/op
CachingClusteredClientBenchmark.groupByQuery                              10240                       4             8                 all             75000  avgt   30   726906.643 ± 67856.413  us/op
CachingClusteredClientBenchmark.groupByQuery                              10240                       2             8                 all             75000  avgt   30  1094897.814 ± 44889.933  us/op
CachingClusteredClientBenchmark.groupByQuery                              10240                       1             8                 all             75000  avgt   30  1187752.397 ± 37929.303  us/op
CachingClusteredClientBenchmark.groupByQuery                              20480                       4             8                 all             75000  avgt   30   717070.371 ± 95373.466  us/op
CachingClusteredClientBenchmark.groupByQuery                              20480                       2             8                 all             75000  avgt   30  1047820.065 ± 72792.607  us/op
CachingClusteredClientBenchmark.groupByQuery                              20480                       1             8                 all             75000  avgt   30  1165636.716 ± 25094.569  us/op
CachingClusteredClientBenchmark.timeseriesQuery                            5120                       4             8                 all             75000  avgt   30     4241.572 ±    61.572  us/op
CachingClusteredClientBenchmark.timeseriesQuery                            5120                       2             8                 all             75000  avgt   30     7986.811 ±   127.783  us/op
CachingClusteredClientBenchmark.timeseriesQuery                            5120                       1             8                 all             75000  avgt   30     7829.323 ±    66.921  us/op
CachingClusteredClientBenchmark.timeseriesQuery                           10240                       4             8                 all             75000  avgt   30     3534.114 ±    29.147  us/op
CachingClusteredClientBenchmark.timeseriesQuery                           10240                       2             8                 all             75000  avgt   30     7883.849 ±    65.148  us/op
CachingClusteredClientBenchmark.timeseriesQuery                           10240                       1             8                 all             75000  avgt   30     7870.191 ±    98.308  us/op
CachingClusteredClientBenchmark.timeseriesQuery                           20480                       4             8                 all             75000  avgt   30     3543.012 ±    28.043  us/op
CachingClusteredClientBenchmark.timeseriesQuery                           20480                       2             8                 all             75000  avgt   30     7908.610 ±    78.611  us/op
CachingClusteredClientBenchmark.timeseriesQuery                           20480                       1             8                 all             75000  avgt   30     9725.952 ±    59.340  us/op
CachingClusteredClientBenchmark.topNQuery                                  5120                       4             8                 all             75000  avgt   30   349675.258 ±  3556.410  us/op
CachingClusteredClientBenchmark.topNQuery                                  5120                       2             8                 all             75000  avgt   30   528744.700 ±  6688.440  us/op
CachingClusteredClientBenchmark.topNQuery                                  5120                       1             8                 all             75000  avgt   30   580475.864 ±  6444.335  us/op
CachingClusteredClientBenchmark.topNQuery                                 10240                       4             8                 all             75000  avgt   30   339018.234 ±  2702.056  us/op
CachingClusteredClientBenchmark.topNQuery                                 10240                       2             8                 all             75000  avgt   30   513790.033 ±  5519.426  us/op
CachingClusteredClientBenchmark.topNQuery                                 10240                       1             8                 all             75000  avgt   30   545196.149 ±  7395.862  us/op
CachingClusteredClientBenchmark.topNQuery                                 20480                       4             8                 all             75000  avgt   30   340100.347 ± 16336.004  us/op
CachingClusteredClientBenchmark.topNQuery                                 20480                       2             8                 all             75000  avgt   30   536705.227 ±  7110.629  us/op
CachingClusteredClientBenchmark.topNQuery                                 20480                       1             8                 all             75000  avgt   30   562068.604 ±  8781.368  us/op

Other changes are:

  • Added CachingClusteredClientBenchmark which can reproduce the above benchmark result.
  • Removed unused OrderedMergeSequence
  • Extracted the common code to build a combine tree from ParallelCombiner to ParallelCombines.
  • Moved createMergeFn() from ResultMergeQueryRunner to QueryToolChest.
  • Change QueryableDruidServer to an interface and added RemoteDruidServer to write the benchmark.
  • Added unit tests to test parallel merging for timeseries, topN, and groupBy queries.

This adds some new query contexts, and so is labelled as Design Review.

@jihoonson
Copy link
Contributor Author

@drcrallen would you please review this?

@fjy fjy added this to the 0.13.1 milestone Nov 19, 2018
@gianm gianm requested a review from drcrallen November 27, 2018 22:53
@@ -37,6 +37,8 @@
public static final String DEFAULT_TIMEOUT_KEY = "defaultTimeout";
@Deprecated
public static final String CHUNK_PERIOD_KEY = "chunkPeriod";
public static final String NUM_BROKER_PARALLEL_COMBINE_THREADS = "numBrokerParallelCombineThreads";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to not add this configurations and figure out the most appropriate setting automatically?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, what is the most appropriate setting you are thinking? I guess it's possible to automatically pick something reasonable based on some statistics like intermediate aggregate size, # of rows to be combined, etc. This is possible in the future when we have a system to collect such statistics.

It currently picks all available processing threads if this value is set to -1 (https://github.com/apache/incubator-druid/pull/6629/files#diff-d8ff07a1952a0070326e5647b4e738f9R45). Do you have a better idea?

*/
public static <T> int getNumBrokerParallelCombineThreads(Query<T> query)
{
return parseInt(query, NUM_BROKER_PARALLEL_COMBINE_THREADS, NO_PARALLEL_COMBINE_THREADS);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From this line it seemed to me that the default is not parallelizing by default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's not enabled by default because I'm not sure how this affects to the overall performance of the cluster if it's enabled for all queries. Also, enabling parallel combine might not be good if the size of intermediate aggregates is not large enough which is the common Druid use case. I added this to doc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to manage processing resources so that there is confidence that even if parallel combine is enabled for all queries, it doesn't make things much worse (but parallelization overhead is acceptable, e. g. within 10% of regression in the most unfavourable cases)?

Is it possible to estimate the size of intermediate aggregates and enable parallel combine automatically when it begins to make sense?

Copy link
Contributor Author

@jihoonson jihoonson Dec 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to manage processing resources so that there is confidence that even if parallel combine is enabled for all queries, it doesn't make things much worse (but parallelization overhead is acceptable, e. g. within 10% of regression in the most unfavourable cases)?

Here is how it currently works.

  • Brokers use the blockingPool to manage processing threads.
  • If NUM_BROKER_PARALLEL_COMBINE_THREADS = -1 for a query, the broker reserves all current available resources to the query. Once the query computes the actual required number of thread pools among the available ones, it immediately releases them (https://github.com/apache/incubator-druid/pull/6629/files#diff-dc9ea12c892e41bbd957c77b43810ddeR103). If there are not enough threads available, the query is simply run without parallel combine.
  • if NUM_BROKER_PARALLEL_COMBINE_THREADS is a positive number n, the broker reserves n threads. If there's not enough threads, the query waits for n threads to be available.

So, for your question, if NUM_BROKER_PARALLEL_COMBINE_THREADS is set to -1 for all queries, all queries can be run at least without waiting for threads which I think is the bottom line.

Is it possible to estimate the size of intermediate aggregates and enable parallel combine automatically when it begins to make sense?

Well, we can do some rough optimization based on the estimated input size and the cardinality of grouping keys, but not sure how good it will be.

In the future, I think it's possible to do advanced optimizations for queries which need blocking aggregation in historicals, like groupBy. Since, in blocking aggregation, historicals are able to know the exact size and number of intermediate aggregates they will send to the broker, they can send those metadata before sending the actual aggregates, so that the broker can make some decisions about run-time optimization like parallel merge. Does this make sense?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does ForkJoinPool / parallel Streams facility approach this problem? Is it applicable to parallel combine in brokers?

Well, we can do some rough optimization based on the estimated input size and the cardinality of grouping keys, but not sure how good it will be.

I think the advantage that we have and parallel streams don't is that we can safely assume that all pieces of data to be merged take approximately the same time. So we could extrapolate, based on processing of some small part of data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leventov I'm still not sure what your idea is to achieve. Currently, a broker can combine multiple queries at the same time without blocking each other, although the number of concurrent queries is limited by the number of its http threads. I'm not sure how your idea is better than now.

Do you just want to avoid the manual setting for NUM_BROKER_PARALLEL_COMBINE_THREADS? My intention was we need a way to explicitly specify the number of threads for combine because automatic optimization might be wrong or not enough. (This is especially true in this PR because the only available option for auto optimization is to set NUM_BROKER_PARALLEL_COMBINE_THREADS to -1.) However, if they really want to use some particular number of threads, there should be enough available threads at the query time. Otherwise, the query will be blocked and failed in the end.

What do you think about this? And do you think it makes sense to add a better optimization mechanism later?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's essentially impossible to set NUM_BROKER_PARALLEL_COMBINE_THREADS "right" even if it is done on query per query basis, that will nobody ever do. This moment broker could be idle, the next moment a lot of queries will arrive and setting NUM_BROKER_PARALLEL_COMBINE_THREADS will only make things worse.

because automatic optimization might be wrong or not enough

My intention to employ techniques from ForkJoinPool. When you use parallelStream(), it basically could not parallelize wrong or not enough. It may not be 100% optimal because of coarse partitioning granularity (You could only split any piece of work in two parts, not three, for example), but it is always within 10-20% from optimum. And this actually should not be so for combine on brokers.

The only way to really abuse parallelStream() is to pass it something that could be completed in less than 10ms. But it could be safely avoided in parallel combine too, as I shown above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leventov sorry, maybe I wasn't clear. What I don't understand is how your idea makes the combine faster than at least the current implementation in master.

I talked about this with @jon-wei and maybe the following is what you're thinking.

You may think of the below architecture for parallel combine. H{n} and F{n} represent a HTTP thread and a Fork Join Task, respectively. S{n} is a sequence.

 H1 H2 H3 H4 H5 (HTTP thread pool)
  \  \ | /  /
  +----------+
  | F1 F2 F3 | (Fork Join Pool)
  +----------+
 /  /  / \  \  \
S1 S2 S3 S4 S5 S6

The fork join tasks can combine any sequences for any queries, but the sequences to combine together should be for the same query. Also, each fork join task computes for a limited time like 10ms as you said. Once they complete their task, they return the result to the proper HTTP thread. The HTTP threads combine the results from fork join tasks again to get the final result. For a query, since the combine is hierarchically computed in an HTTP thread and fork join tasks, it can be done in parallel and be faster than the current implementation in master. If this is what you think, I think it's a good idea.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't understand what do you mean, but we could do the initial "sampling" merge of just two sequences, and then, based on the timing and the number of idle threads in FJP (getRunningThreadCount(), getActiveThreadCount(), etc.) decide on the degree of merge, similarly to what you already do in ParallelMergeCombineSequence. So it's the same hierarchical merge, just done in FJP with "yields" every 10ms, so that small queries that arrived in parallel with the humongous query have a chance to intercept and don't have to wait long until that big query combine is completed, because all processing threads are busy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leventov got it. I think now I understand what you mean though it's still not clear how to reserve the threads needed for hierarchical merge with forkJoinPool. (If hierarchical merge is done as in this PR, it needs sort of a gang scheduling. Once we decide to use some threads for a query, they should be available at the same time.)

Anyway, I like this idea and will change my implementation. Probably will close this and raise another PR. Thanks.

Copy link
Contributor

@drcrallen drcrallen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got through a chunk of it but ran out of time to continue. My main question in the implementation is as follows: There is already a large body of effort and thought in the java.lang.Thread* and java.util.concurrent packages that seem to be purposefully avoided here. I haven't been able to fully dig into ParallelMergeCombineSequence yet, but it seems like a lot of the java concepts should be able to be used here. I'll try to give some examples when I can

int available();

/**
* Poll all available resources from the pool. If there's no available resource, it returns an empty list.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BlockingQueue interface has a drainTo method that does something similar. Can you add documentation on how the behavior of this method is similar or different compared to drainTo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I ask why you think it should be documented? BlockingPool is our own API and it doesn't use BlockingQueue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to think as a new developer. What would reduce the total spinup time to get effective contributions. There's a lot of methods which are kind of like core java methods, but are a little different, and it gets hard to differentiate what is important and what is not.

IMHO "This is a subset of BlockingQueue with some other added features, but we don't want to depend on keeping up with BlockingQueue upstream changes" is an easier story than "We made up our own interfaces that behave different than other things you've seen, and you have to learn about how the new interfaces are and are not enforced"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a subset of BlockingQueue. It's only similar in terms of "the blocking operation", but it doesn't mean they're similar. The queue and the pool are different. The key characteristic of the queue is that it's FIFO, but there's no order for inserting/getting items to/from the pool.

I don't think anyone would guess BlockingPool's behavior by comparing it with BlockingQueue. If something is not clear, it just means we need to add more detailed doc.

{
return objects.size();
final ReentrantLock lock = this.lock;
lock.lock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why bother locking if the size returned is immediately going to be invalid? aka, some other thread may have already changed the size.

Copy link
Contributor Author

@jihoonson jihoonson Dec 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that's a good point. I changed this to VisibleForTesting and takeBatch() to return the number of remaining resources together. Thanks!

@@ -99,6 +99,11 @@ public static DateTime of(String instant)
return new DateTime(instant, ISOChronology.getInstanceUTC());
}

public static DateTime of(String format, Object... formatArgs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a little unnatural to me IMHO. DateTime.of that actually does string formatting is a bit surprising. I propose changing the method name to something more descriptive to what the method is doing, or just use StringUtils.format in the callers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, would you elaborate more on why it's unnatural?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems easier to add a factory method that accepts all parts from year to second, as in http://joda-time.sourceforge.net/apidocs/org/joda/time/DateTime.html#DateTime(int,%20int,%20int,%20int,%20int,%20int).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add it, but I'm not sure why it's better. We have a similar method for Interval.

  public static Interval of(String format, Object... formatArgs)
  {
    return of(StringUtils.format(format, formatArgs));
  }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That other method is added recently. It used at nine places in tests. Six of them are moot, because parameters are constants and instead Intervals.of(String) could be used. At the other three places, I would say, it would be better to use new Interval(Datetimes.of(...), Datetimes.of(...)), where the Datetimes.of() is what is suggested above.

Minor, but real disadvantages of the vararg methods is that they are not reflected in the corresponding IntelliJ inspection (see "MalformedFormatString" in inspectionProfiles/Druid.xml) and extra performance cost. While they could be replaced trivially.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @leventov on these points

if (numParallelCombineThreads > 0) {
final ReserveResult reserveResult = processingThreadResourcePool.reserve(query, numParallelCombineThreads);
if (!reserveResult.isOk()) {
throw new ISE(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this really need to hard fail? can it just warn and fall back to the default behavior?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea behind this is, druid currently can't make the optimal decision for resource planning by itself. (The optimal automatic resource allocation I think is at least that Druid should be able to allocate proper resources based on the query priority and how heavy its workload is. So, users should be careful when allocating resources. If there isn't enough resources required for this particular query, it means user's resource planning failed and the query should be failed.

} else if (numParallelCombineThreads == QueryContexts.NO_PARALLEL_COMBINE_THREADS) {
return sequentialMerge(sequencesByInterval);
} else {
throw new ISE(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest making the check simply

if (numParallelCombineThreads == QueryContexts.NO_PARALLEL_COMBINE_THREADS) {
        return sequentialMerge(sequencesByInterval);
      } 
        final ReserveResult reserveResult = processingThreadResourcePool.reserve(query, numParallelCombineThreads);
        if (reserveResult.isOk()) {
          return parallelMerge(sequencesByInterval, reserveResult.getResources());
        } else {
          return sequentialMerge(sequencesByInterval);
        }

and have processingThreadResourcePool.reserve handle the positive or negative cases (negative meaning all available, positive meaning no more than that number)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a good change. numParallelCombineThreads option is used for only CachingClusteredClient, so it should be responsible for checking the given option is valid or not.

@@ -40,9 +40,10 @@ public BaseSequence(
public <OutType> OutType accumulate(OutType initValue, final Accumulator<OutType, T> fn)
{
IterType iterator = maker.make();
OutType accumulated = initValue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

much cleaner!

*/
public class ParallelMergeCombineSequence<T> extends YieldingSequenceBase<T>
{
private static final int MINIMUM_LEAF_COMBINE_DEGREE = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is possible this is too aggressive. For example, merging 16 in series vs merging 8x2, 4x2, 2x2 and a final may not be worth it compared to simply merging 16. In my own tests I found simply splitting to serial groups of 100, and doing a serial merge on the groups, worked pretty darn well. That's only two levels of merging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a start point to find the proper degree for leaf nodes. If numBrokerParallelCombineThreads = 1, then a single thread will combine all input sequences.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the main CPU cost lays in the combine function, does it really matter? In the end, we do the same number of calls of this function, if I understand it right.

If the cost of infrastructure does matter, probably big wins will be

  • Move off blocking, implement async value interchange between levels. Just in this particular small part of the system. RxJava may already have necessary utilities.
  • Optimize MergeSequence, don't remove and add elements from/to head, just move element up/down in the heap. This technique is already used in IntIteratorUtils.MergeIntIterator and MergingRowIterator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the main CPU cost lays in the combine function, does it really matter? In the end, we do the same number of calls of this function, if I understand it right.

Good point. I think you're right. I'll test it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point is, if you fork too much, there are scenarios where maintenance of the forks exceeds the work in the forks themselves. It isn't a problem until its a problem though, and if the macro benchmarks show overall improvement I'm good with real-deployment profiling to see if the overhead is really there before trying to make any changes or tunings to maximum combinations in a single pass.

return nextVal != null;
}
catch (InterruptedException e) {
throw new RuntimeException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread.currentThread().interrupt()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you tell me why? Is there anything else which should handle InterruptedException particularly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everywhere that catches Exception that could be an interrupted exception, or anywhere that catches InterruptedException will swallow the interrupted flag on the thread. You can search the code for usages of java.lang.Thread#interrupt for other places it is done. It is a nasty latent bug source since it can easily result in an interrupt being lost and a retry loop clogging up a shutdown or recovery cycle of some kind. There's a reason the scala includes InterruptedException in its fatal list of exceptions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know. My question is what's the point of interrupting itself again? This exception is just thrown by the threads who run the runnable in background.

private T nextVal;

@Override
public boolean hasNext()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be true as long as next() is called up to sequenceList.size() times right? do you need to block during the hasNext call vs in the next call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, all values from sequences of sequenceList are combined and stored in queue. hasNext() should return true until all values are aggregated and this queue becomes empty.
We can't block in next() since we can't know how many values will be in queue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried looking through the code base, and other code bases, for some direction on if blocking on hasNext is good or bad. I didn't find strong evidence either way. Luckily, iterators commonly have a loop like

while (it.hasNext())
{
  Object next = it.next();
}

But from a implementation compliant standpoint, the following code yield the same result:

while (it.hasNext() && it.hasNext() && it.hasNext() && it.hasNext() && it.hasNext())
{
  Object next = it.next();
}

Which I think is violated by this implementation

@jihoonson
Copy link
Contributor Author

My main question in the implementation is as follows: There is already a large body of effort and thought in the java.lang.Thread* and java.util.concurrent packages that seem to be purposefully avoided here. I haven't been able to fully dig into ParallelMergeCombineSequence yet, but it seems like a lot of the java concepts should be able to be used here. I'll try to give some examples when I can

@drcrallen thank you for taking a look. I'll check your comments soon. For this question, if you talk about ForkJoinPool, I don't think it's appropriate for this kind of use case. The algorithm implemented in this PR is a sort of pipelining combining sorted data. Each thread combines data from its children, and the parent and its children threads can be run at the same time. I took this approach because the final results should be streamed to the query client.

However, I think ForkJoinPool is appropriate for divide-and-conquer style algorithms. For example, it's useful when summing a large sequence of integers. It can automatically split the sequence into several small chunks, and sum them in parallel. The results of summing small chunks should be aggregated again. Here, summing small chunks and aggregating their results can't be done at the same time. What do you think?

pQueue,
PriorityQueue<Yielder<T>> pQueue = baseSequences.accumulate(
new PriorityQueue<>(
32,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If anyone finds this comment through code archeology, this constant was me just poking around and trying stuff. It is not thoroughly tested :(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Good to know.

@jihoonson
Copy link
Contributor Author

@drcrallen thanks for the review, but I'll change my implementation based on the discussion with @leventov. (See #6629 (comment).) Probably I'll close this and raise another PR.

@jihoonson
Copy link
Contributor Author

I'm closing this PR. Will raise a new one later.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants