From 434d405373ea214246810cac0bc7bfc6c8e910a4 Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Thu, 1 Apr 2021 14:42:12 +0900 Subject: [PATCH] #3642 Still query blocks on indexing node --- .../main/java/io/druid/concurrent/Execs.java | 41 +++++++++-- .../main/java/io/druid/utils/StopWatch.java | 26 +++++-- .../main/java/io/druid/query/QueryConfig.java | 2 +- .../io/druid/query/QueryRunnerHelper.java | 70 ++++++------------- .../java/io/druid/query/QueryRunners.java | 47 ++++++++----- .../java/io/druid/query/QueryWatcher.java | 10 +++ .../java/io/druid/server/QueryManager.java | 18 +++-- 7 files changed, 132 insertions(+), 82 deletions(-) diff --git a/common/src/main/java/io/druid/concurrent/Execs.java b/common/src/main/java/io/druid/concurrent/Execs.java index 18e9f92341bb..18e0e2600dc5 100644 --- a/common/src/main/java/io/druid/concurrent/Execs.java +++ b/common/src/main/java/io/druid/concurrent/Execs.java @@ -36,13 +36,13 @@ import io.druid.common.guava.GuavaUtils; import io.druid.java.util.common.logger.Logger; import io.druid.utils.Runnables; +import io.druid.utils.StopWatch; import javax.annotation.Nullable; import javax.validation.constraints.NotNull; import java.io.Closeable; import java.io.IOException; import java.util.List; -import java.util.Queue; import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -276,7 +276,12 @@ public static class ExecutorQueue implements Closeable public ExecutorQueue(int parallelism) { - semaphore = new Semaphore(parallelism); + this(new Semaphore(parallelism)); + } + + public ExecutorQueue(Semaphore semaphore) + { + this.semaphore = semaphore; closer = () -> semaphore.close(); } @@ -323,6 +328,23 @@ public boolean acquire(WaitingFuture future) return !future.isCancelled(); } + public boolean acquire(WaitingFuture future, StopWatch watch) + { + log.debug("> acquiring %s", name); + try { + if (watch != null) { + watch.acquire(semaphore); + } else { + semaphore.acquire(); + } + } + catch (Exception e) { + return future.setException(e); + } + log.debug("< acquired %s", name); + return !future.isCancelled(); + } + @Override public void close() { @@ -371,13 +393,24 @@ public static List> execute( final Semaphore semaphore, final int priority ) + { + return execute(executor, works, semaphore, null, priority); + } + + public static List> execute( + final ExecutorService executor, + final Iterable> works, + final Semaphore semaphore, + final StopWatch watch, + final int priority + ) { final int parallelism = semaphore.availablePermits(); Preconditions.checkArgument(parallelism > 0, "Invalid parallelism %d", parallelism); log.debug("Executing with parallelism : %d", parallelism); // must be materialized first final List> futures = GuavaUtils.transform(works, WaitingFuture.toWaiter()); - final Queue> queue = new LinkedBlockingQueue>(futures); + final BlockingQueue> queue = new LinkedBlockingQueue>(futures); try { for (int i = 0; i < parallelism; i++) { executor.submit( @@ -393,7 +426,7 @@ public int getPriority() public void run() { for (WaitingFuture work = queue.poll(); work != null; work = queue.poll()) { - if (!semaphore.acquire(work) || !work.execute()) { + if (!semaphore.acquire(work, watch) || !work.execute()) { log.debug("Something wrong.. aborting"); // can be normal process break; } diff --git a/common/src/main/java/io/druid/utils/StopWatch.java b/common/src/main/java/io/druid/utils/StopWatch.java index 4f402ad234a4..7a696f382a30 100644 --- a/common/src/main/java/io/druid/utils/StopWatch.java +++ b/common/src/main/java/io/druid/utils/StopWatch.java @@ -22,6 +22,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -36,7 +37,7 @@ public StopWatch(long timeout) public T wainOn(Future future) throws TimeoutException, ExecutionException, InterruptedException { - final long remaining = timeout - System.currentTimeMillis(); + final long remaining = remaining(); if (remaining <= 0) { throw new TimeoutException(); } @@ -45,8 +46,7 @@ public T wainOn(Future future) throws TimeoutException, ExecutionExceptio public boolean enqueue(BlockingQueue queue, T element) throws TimeoutException, InterruptedException { - long remaining = timeout - System.currentTimeMillis(); - for (; remaining > 0; remaining = timeout - System.currentTimeMillis()) { + for (long remaining = remaining(); remaining > 0; remaining = remaining()) { if (queue.offer(element, remaining, TimeUnit.MILLISECONDS)) { return true; } @@ -56,8 +56,7 @@ public boolean enqueue(BlockingQueue queue, T element) throws TimeoutExce public T dequeue(BlockingQueue queue) throws TimeoutException, InterruptedException { - long remaining = timeout - System.currentTimeMillis(); - for (; remaining > 0; remaining = timeout - System.currentTimeMillis()) { + for (long remaining = remaining(); remaining > 0; remaining = remaining()) { T poll = queue.poll(remaining, TimeUnit.MILLISECONDS); if (poll != null) { return poll; @@ -66,8 +65,23 @@ public T dequeue(BlockingQueue queue) throws TimeoutException, Interrupte throw new TimeoutException(); } + public boolean acquire(Semaphore semaphore) throws TimeoutException, InterruptedException + { + for (long remaining = remaining(); remaining > 0; remaining = remaining()) { + if (semaphore.tryAcquire(remaining, TimeUnit.MILLISECONDS)) { + return true; + } + } + throw new TimeoutException(); + } + + public long remaining() + { + return timeout - System.currentTimeMillis(); + } + public boolean isExpired() { - return timeout <= System.currentTimeMillis(); + return remaining() <= 0; } } diff --git a/processing/src/main/java/io/druid/query/QueryConfig.java b/processing/src/main/java/io/druid/query/QueryConfig.java index 6eef21ac8f2a..cbc5d0b66218 100644 --- a/processing/src/main/java/io/druid/query/QueryConfig.java +++ b/processing/src/main/java/io/druid/query/QueryConfig.java @@ -98,7 +98,7 @@ public int getMaxResults(Query query) public int getQueryParallelism(Query query) { final int systemMax = maxQueryParallelism; - final int userMax = query.getContextInt(Query.MAX_QUERY_PARALLELISM, 4); + final int userMax = query.getContextInt(Query.MAX_QUERY_PARALLELISM, QueryRunners.MAX_QUERY_PARALLELISM); return Math.min(systemMax, userMax); } diff --git a/processing/src/main/java/io/druid/query/QueryRunnerHelper.java b/processing/src/main/java/io/druid/query/QueryRunnerHelper.java index 65835c511542..840e70606f13 100644 --- a/processing/src/main/java/io/druid/query/QueryRunnerHelper.java +++ b/processing/src/main/java/io/druid/query/QueryRunnerHelper.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; -import com.google.common.collect.Iterables; import io.druid.cache.Cache; import io.druid.common.guava.Sequence; import io.druid.common.utils.Sequences; @@ -88,61 +87,36 @@ public static QueryRunner toManagementRunner( ); } - public static Iterable>> asCallable( - final Iterable> runners, + public static Callable> asCallable( + final QueryRunner runner, final Query query, final Map responseContext ) { - return Iterables.transform( - runners, - new Function, Callable>>() - { - @Override - public Callable> apply(final QueryRunner runner) - { - return new PrioritizedCallable.Background>() - { - @Override - public Sequence call() - { - return runner.run(query, responseContext); - } - }; - } - } - ); + return new PrioritizedCallable.Background>() + { + @Override + public Sequence call() + { + return runner.run(query, responseContext); + } + }; } - public static Iterable>> asCallable( - final Iterable> runners, - final Execs.Semaphore semaphore, + public static Callable> asCallable( + final QueryRunner runner, final Query query, - final boolean materialze, - final Map responseContext + final Map responseContext, + final Execs.Semaphore semaphore ) { - return Iterables.transform( - asCallable(runners, query, responseContext), - new Function>, Callable>>() - { - @Override - public Callable> apply(final Callable> callable) - { - return new PrioritizedCallable.Background>() - { - @Override - public Sequence call() throws Exception - { - Sequence sequence = Sequences.withBaggage(callable.call(), semaphore); - if (materialze) { - sequence = Sequences.materialize(sequence); - } - return sequence; - } - }; - } - } - ); + return new PrioritizedCallable.Background>() + { + @Override + public Sequence call() throws Exception + { + return Sequences.materialize(Sequences.withBaggage(runner.run(query, responseContext), semaphore)); + } + }; } } diff --git a/processing/src/main/java/io/druid/query/QueryRunners.java b/processing/src/main/java/io/druid/query/QueryRunners.java index d328cddeb68c..8fe0c4b84c31 100644 --- a/processing/src/main/java/io/druid/query/QueryRunners.java +++ b/processing/src/main/java/io/druid/query/QueryRunners.java @@ -26,7 +26,6 @@ import com.google.common.collect.Maps; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import io.druid.common.guava.FutureSequence; import io.druid.common.guava.Sequence; import io.druid.common.utils.Sequences; import io.druid.concurrent.Execs; @@ -49,7 +48,7 @@ public class QueryRunners { - public static final int MAX_QUERY_PARALLELISM = 4; + public static final int MAX_QUERY_PARALLELISM = 4; // todo: use QueryConfig.maxQueryParallelism instead private static final Logger LOG = new Logger(QueryRunners.class); @@ -223,7 +222,7 @@ public Sequence run(Query query, Map responseContext) final List columns = query.estimatedInitialColumns(); final Comparator ordering = query.getMergeOrdering(columns); // used for limiting resource usage from heavy aggregators like CountMinSketch - final int parallelism = query.getContextInt(Query.MAX_QUERY_PARALLELISM, MAX_QUERY_PARALLELISM); + final int parallelism = Math.min(runners.size(), watcher.getQueryConfig().getQueryParallelism(query)); if (parallelism < 1 || Execs.isDirectExecutor(executor)) { // no limit.. todo: deprecate this return new ChainedExecutionQueryRunner(executor, watcher, runners) @@ -235,30 +234,44 @@ protected Comparator getMergeOrdering(Query query) } }; } + final int priority = BaseQuery.getContextPriority(query, 0); + final Execs.Semaphore semaphore = new Execs.Semaphore(parallelism); + if (ordering == null) { + return new QueryRunner() + { + @Override + public Sequence run(Query query, Map responseContext) + { + final Execs.ExecutorQueue> queue = new Execs.ExecutorQueue<>(semaphore); + for (QueryRunner runner : runners) { + queue.add(QueryRunnerHelper.asCallable(runner, query, responseContext)); + } + final List>> futures = queue.execute(executor, priority); + final Closeable resource = () -> { + queue.close(); + Execs.cancelQuietly(Futures.allAsList(futures)); + }; + final StopWatch watch = watcher.register(query, Futures.allAsList(futures), resource); + return Sequences.concat(columns, Iterables.transform(futures, future -> QueryRunners.waitOn(future, watch))); + } + }; + } return new QueryRunner() { @Override public Sequence run(Query query, Map responseContext) { - final int priority = BaseQuery.getContextPriority(query, 0); - final Execs.Semaphore semaphore = new Execs.Semaphore(Math.min(parallelism, runners.size())); - final Iterable>> works = QueryRunnerHelper.asCallable( - runners, semaphore, query, ordering != null, responseContext + final Iterable>> works = Iterables.transform( + runners, runner -> QueryRunnerHelper.asCallable(runner, query, responseContext, semaphore) + ); + final StopWatch watch = new StopWatch(watcher.remainingTime(query.getId())); + final ListenableFuture>> future = Futures.allAsList( + Execs.execute(executor, works, semaphore, watch, priority) ); - final List>> futures = Execs.execute(executor, works, semaphore, priority); - final ListenableFuture>> future = Futures.allAsList(futures); final Closeable resource = () -> { semaphore.destroy(); Execs.cancelQuietly(future); }; - if (ordering == null) { - final ListenableFuture> first = futures.get(0); - final List>> others = futures.subList(1, futures.size()); - final Sequence sequence = waitForCompletion(query, first, watcher, resource); - return sequence == null ? Sequences.withBaggage(Sequences.empty(columns), resource) : - Sequences.withBaggage(Sequences.concat(sequence, Sequences.concat( - Iterables.transform(others, FutureSequence.toSequence(sequence.columns())))), resource); - } final List> sequences = waitForCompletion(query, future, watcher, resource); return sequences == null ? Sequences.withBaggage(Sequences.empty(columns), resource) : Sequences.withBaggage(QueryUtils.mergeSort(columns, ordering, sequences), resource); diff --git a/processing/src/main/java/io/druid/query/QueryWatcher.java b/processing/src/main/java/io/druid/query/QueryWatcher.java index 9681c31df21e..681ef0a10167 100644 --- a/processing/src/main/java/io/druid/query/QueryWatcher.java +++ b/processing/src/main/java/io/druid/query/QueryWatcher.java @@ -64,8 +64,12 @@ default StopWatch register(Query query, ListenableFuture future) boolean isTimedOut(String queryId); + QueryConfig getQueryConfig(); + class Abstract implements QueryWatcher { + private static final QueryConfig DUMMY = new QueryConfig(); + @Override public StopWatch register(Query query, ListenableFuture future, Closeable resource) { @@ -89,5 +93,11 @@ public void finished(String queryId) {} @Override public boolean isTimedOut(String queryId) { return false;} + + @Override + public QueryConfig getQueryConfig() + { + return DUMMY; + } } } diff --git a/server/src/main/java/io/druid/server/QueryManager.java b/server/src/main/java/io/druid/server/QueryManager.java index 6e703ca51bfc..d10e17db0bc5 100644 --- a/server/src/main/java/io/druid/server/QueryManager.java +++ b/server/src/main/java/io/druid/server/QueryManager.java @@ -63,7 +63,7 @@ public class QueryManager implements QueryWatcher, Runnable private static final long DEFAULT_EXPIRE = 180_000; // 3 min private static final long LOG_THRESHOLD_MSEC = 200; - private final long maxQueryTimeout; + private final QueryConfig config; private final Map queries = Maps.newConcurrentMap(); private final ListeningExecutorService executor = Execs.newDirectExecutorService(); @@ -76,7 +76,13 @@ public QueryManager() @Inject public QueryManager(QueryConfig config) { - this.maxQueryTimeout = config.getMaxQueryTimeout(); + this.config = config; + } + + @Override + public QueryConfig getQueryConfig() + { + return config; } public void start(long intervalSec) @@ -132,10 +138,10 @@ public StopWatch register(final Query query, final ListenableFuture future, fina final List dataSources = query.getDataSource().getNames(); if (id == null) { LOG.warn("Query id for %s%s is null.. fix that", query.getType(), dataSources); - return new StopWatch(maxQueryTimeout); + return new StopWatch(config.getMaxQueryTimeout()); } final QueryStatus status = queries.computeIfAbsent( - id, k -> new QueryStatus(query.getType(), dataSources, BaseQuery.getTimeout(query, maxQueryTimeout)) + id, k -> new QueryStatus(query.getType(), dataSources, BaseQuery.getTimeout(query, config.getMaxQueryTimeout())) ); final long remaining = status.start(future, resource); future.addListener(() -> status.end(future), executor); @@ -159,10 +165,10 @@ public void unregister(Query query, Closeable resource) public long remainingTime(String queryId) { if (queryId == null) { - return maxQueryTimeout; // bug + return config.getMaxQueryTimeout(); // bug } final QueryStatus status = queries.get(queryId); // some internal queries? - return status == null ? maxQueryTimeout : status.remaining(); + return status == null ? config.getMaxQueryTimeout() : status.remaining(); } public List getQueryDatasources(final String queryId)