Skip to content

Commit

Permalink
apache#3642 Still query blocks on indexing node
Browse files Browse the repository at this point in the history
navis committed Apr 1, 2021

Verified

This commit was signed with the committer’s verified signature. The key has expired.
mjcarroll Michael Carroll
1 parent d200a14 commit 434d405
Showing 7 changed files with 132 additions and 82 deletions.
41 changes: 37 additions & 4 deletions common/src/main/java/io/druid/concurrent/Execs.java
Original file line number Diff line number Diff line change
@@ -36,13 +36,13 @@
import io.druid.common.guava.GuavaUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.utils.Runnables;
import io.druid.utils.StopWatch;

import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@@ -276,7 +276,12 @@ public static class ExecutorQueue<T> implements Closeable

public ExecutorQueue(int parallelism)
{
semaphore = new Semaphore(parallelism);
this(new Semaphore(parallelism));
}

public ExecutorQueue(Semaphore semaphore)
{
this.semaphore = semaphore;
closer = () -> semaphore.close();
}

@@ -323,6 +328,23 @@ public boolean acquire(WaitingFuture future)
return !future.isCancelled();
}

public boolean acquire(WaitingFuture future, StopWatch watch)
{
log.debug("> acquiring %s", name);
try {
if (watch != null) {
watch.acquire(semaphore);
} else {
semaphore.acquire();
}
}
catch (Exception e) {
return future.setException(e);
}
log.debug("< acquired %s", name);
return !future.isCancelled();
}

@Override
public void close()
{
@@ -371,13 +393,24 @@ public static <V> List<ListenableFuture<V>> execute(
final Semaphore semaphore,
final int priority
)
{
return execute(executor, works, semaphore, null, priority);
}

public static <V> List<ListenableFuture<V>> execute(
final ExecutorService executor,
final Iterable<Callable<V>> works,
final Semaphore semaphore,
final StopWatch watch,
final int priority
)
{
final int parallelism = semaphore.availablePermits();
Preconditions.checkArgument(parallelism > 0, "Invalid parallelism %d", parallelism);
log.debug("Executing with parallelism : %d", parallelism);
// must be materialized first
final List<WaitingFuture<V>> futures = GuavaUtils.transform(works, WaitingFuture.<V>toWaiter());
final Queue<WaitingFuture<V>> queue = new LinkedBlockingQueue<WaitingFuture<V>>(futures);
final BlockingQueue<WaitingFuture<V>> queue = new LinkedBlockingQueue<WaitingFuture<V>>(futures);
try {
for (int i = 0; i < parallelism; i++) {
executor.submit(
@@ -393,7 +426,7 @@ public int getPriority()
public void run()
{
for (WaitingFuture<V> work = queue.poll(); work != null; work = queue.poll()) {
if (!semaphore.acquire(work) || !work.execute()) {
if (!semaphore.acquire(work, watch) || !work.execute()) {
log.debug("Something wrong.. aborting"); // can be normal process
break;
}
26 changes: 20 additions & 6 deletions common/src/main/java/io/druid/utils/StopWatch.java
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

@@ -36,7 +37,7 @@ public StopWatch(long timeout)

public <T> T wainOn(Future<T> future) throws TimeoutException, ExecutionException, InterruptedException
{
final long remaining = timeout - System.currentTimeMillis();
final long remaining = remaining();
if (remaining <= 0) {
throw new TimeoutException();
}
@@ -45,8 +46,7 @@ public <T> T wainOn(Future<T> future) throws TimeoutException, ExecutionExceptio

public <T> boolean enqueue(BlockingQueue<T> queue, T element) throws TimeoutException, InterruptedException
{
long remaining = timeout - System.currentTimeMillis();
for (; remaining > 0; remaining = timeout - System.currentTimeMillis()) {
for (long remaining = remaining(); remaining > 0; remaining = remaining()) {
if (queue.offer(element, remaining, TimeUnit.MILLISECONDS)) {
return true;
}
@@ -56,8 +56,7 @@ public <T> boolean enqueue(BlockingQueue<T> queue, T element) throws TimeoutExce

public <T> T dequeue(BlockingQueue<T> queue) throws TimeoutException, InterruptedException
{
long remaining = timeout - System.currentTimeMillis();
for (; remaining > 0; remaining = timeout - System.currentTimeMillis()) {
for (long remaining = remaining(); remaining > 0; remaining = remaining()) {
T poll = queue.poll(remaining, TimeUnit.MILLISECONDS);
if (poll != null) {
return poll;
@@ -66,8 +65,23 @@ public <T> T dequeue(BlockingQueue<T> queue) throws TimeoutException, Interrupte
throw new TimeoutException();
}

public boolean acquire(Semaphore semaphore) throws TimeoutException, InterruptedException
{
for (long remaining = remaining(); remaining > 0; remaining = remaining()) {
if (semaphore.tryAcquire(remaining, TimeUnit.MILLISECONDS)) {
return true;
}
}
throw new TimeoutException();
}

public long remaining()
{
return timeout - System.currentTimeMillis();
}

public boolean isExpired()
{
return timeout <= System.currentTimeMillis();
return remaining() <= 0;
}
}
2 changes: 1 addition & 1 deletion processing/src/main/java/io/druid/query/QueryConfig.java
Original file line number Diff line number Diff line change
@@ -98,7 +98,7 @@ public int getMaxResults(Query<?> query)
public int getQueryParallelism(Query<?> query)
{
final int systemMax = maxQueryParallelism;
final int userMax = query.getContextInt(Query.MAX_QUERY_PARALLELISM, 4);
final int userMax = query.getContextInt(Query.MAX_QUERY_PARALLELISM, QueryRunners.MAX_QUERY_PARALLELISM);
return Math.min(systemMax, userMax);
}

70 changes: 22 additions & 48 deletions processing/src/main/java/io/druid/query/QueryRunnerHelper.java
Original file line number Diff line number Diff line change
@@ -21,7 +21,6 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import io.druid.cache.Cache;
import io.druid.common.guava.Sequence;
import io.druid.common.utils.Sequences;
@@ -88,61 +87,36 @@ public static <T> QueryRunner<T> toManagementRunner(
);
}

public static <T> Iterable<Callable<Sequence<T>>> asCallable(
final Iterable<QueryRunner<T>> runners,
public static <T> Callable<Sequence<T>> asCallable(
final QueryRunner<T> runner,
final Query<T> query,
final Map<String, Object> responseContext
)
{
return Iterables.transform(
runners,
new Function<QueryRunner<T>, Callable<Sequence<T>>>()
{
@Override
public Callable<Sequence<T>> apply(final QueryRunner<T> runner)
{
return new PrioritizedCallable.Background<Sequence<T>>()
{
@Override
public Sequence<T> call()
{
return runner.run(query, responseContext);
}
};
}
}
);
return new PrioritizedCallable.Background<Sequence<T>>()
{
@Override
public Sequence<T> call()
{
return runner.run(query, responseContext);
}
};
}

public static <T> Iterable<Callable<Sequence<T>>> asCallable(
final Iterable<QueryRunner<T>> runners,
final Execs.Semaphore semaphore,
public static <T> Callable<Sequence<T>> asCallable(
final QueryRunner<T> runner,
final Query<T> query,
final boolean materialze,
final Map<String, Object> responseContext
final Map<String, Object> responseContext,
final Execs.Semaphore semaphore
)
{
return Iterables.transform(
asCallable(runners, query, responseContext),
new Function<Callable<Sequence<T>>, Callable<Sequence<T>>>()
{
@Override
public Callable<Sequence<T>> apply(final Callable<Sequence<T>> callable)
{
return new PrioritizedCallable.Background<Sequence<T>>()
{
@Override
public Sequence<T> call() throws Exception
{
Sequence<T> sequence = Sequences.withBaggage(callable.call(), semaphore);
if (materialze) {
sequence = Sequences.materialize(sequence);
}
return sequence;
}
};
}
}
);
return new PrioritizedCallable.Background<Sequence<T>>()
{
@Override
public Sequence<T> call() throws Exception
{
return Sequences.materialize(Sequences.withBaggage(runner.run(query, responseContext), semaphore));
}
};
}
}
47 changes: 30 additions & 17 deletions processing/src/main/java/io/druid/query/QueryRunners.java
Original file line number Diff line number Diff line change
@@ -26,7 +26,6 @@
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.druid.common.guava.FutureSequence;
import io.druid.common.guava.Sequence;
import io.druid.common.utils.Sequences;
import io.druid.concurrent.Execs;
@@ -49,7 +48,7 @@

public class QueryRunners
{
public static final int MAX_QUERY_PARALLELISM = 4;
public static final int MAX_QUERY_PARALLELISM = 4; // todo: use QueryConfig.maxQueryParallelism instead

private static final Logger LOG = new Logger(QueryRunners.class);

@@ -223,7 +222,7 @@ public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
final List<String> columns = query.estimatedInitialColumns();
final Comparator<T> ordering = query.getMergeOrdering(columns);
// used for limiting resource usage from heavy aggregators like CountMinSketch
final int parallelism = query.getContextInt(Query.MAX_QUERY_PARALLELISM, MAX_QUERY_PARALLELISM);
final int parallelism = Math.min(runners.size(), watcher.getQueryConfig().getQueryParallelism(query));
if (parallelism < 1 || Execs.isDirectExecutor(executor)) {
// no limit.. todo: deprecate this
return new ChainedExecutionQueryRunner<T>(executor, watcher, runners)
@@ -235,30 +234,44 @@ protected Comparator<T> getMergeOrdering(Query<T> query)
}
};
}
final int priority = BaseQuery.getContextPriority(query, 0);
final Execs.Semaphore semaphore = new Execs.Semaphore(parallelism);
if (ordering == null) {
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
{
final Execs.ExecutorQueue<Sequence<T>> queue = new Execs.ExecutorQueue<>(semaphore);
for (QueryRunner<T> runner : runners) {
queue.add(QueryRunnerHelper.asCallable(runner, query, responseContext));
}
final List<ListenableFuture<Sequence<T>>> futures = queue.execute(executor, priority);
final Closeable resource = () -> {
queue.close();
Execs.cancelQuietly(Futures.allAsList(futures));
};
final StopWatch watch = watcher.register(query, Futures.allAsList(futures), resource);
return Sequences.concat(columns, Iterables.transform(futures, future -> QueryRunners.waitOn(future, watch)));
}
};
}
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
{
final int priority = BaseQuery.getContextPriority(query, 0);
final Execs.Semaphore semaphore = new Execs.Semaphore(Math.min(parallelism, runners.size()));
final Iterable<Callable<Sequence<T>>> works = QueryRunnerHelper.asCallable(
runners, semaphore, query, ordering != null, responseContext
final Iterable<Callable<Sequence<T>>> works = Iterables.transform(
runners, runner -> QueryRunnerHelper.asCallable(runner, query, responseContext, semaphore)
);
final StopWatch watch = new StopWatch(watcher.remainingTime(query.getId()));
final ListenableFuture<List<Sequence<T>>> future = Futures.allAsList(
Execs.execute(executor, works, semaphore, watch, priority)
);
final List<ListenableFuture<Sequence<T>>> futures = Execs.execute(executor, works, semaphore, priority);
final ListenableFuture<List<Sequence<T>>> future = Futures.allAsList(futures);
final Closeable resource = () -> {
semaphore.destroy();
Execs.cancelQuietly(future);
};
if (ordering == null) {
final ListenableFuture<Sequence<T>> first = futures.get(0);
final List<ListenableFuture<Sequence<T>>> others = futures.subList(1, futures.size());
final Sequence<T> sequence = waitForCompletion(query, first, watcher, resource);
return sequence == null ? Sequences.withBaggage(Sequences.empty(columns), resource) :
Sequences.withBaggage(Sequences.concat(sequence, Sequences.concat(
Iterables.transform(others, FutureSequence.toSequence(sequence.columns())))), resource);
}
final List<Sequence<T>> sequences = waitForCompletion(query, future, watcher, resource);
return sequences == null ? Sequences.withBaggage(Sequences.empty(columns), resource) :
Sequences.withBaggage(QueryUtils.mergeSort(columns, ordering, sequences), resource);
10 changes: 10 additions & 0 deletions processing/src/main/java/io/druid/query/QueryWatcher.java
Original file line number Diff line number Diff line change
@@ -64,8 +64,12 @@ default StopWatch register(Query query, ListenableFuture future)

boolean isTimedOut(String queryId);

QueryConfig getQueryConfig();

class Abstract implements QueryWatcher
{
private static final QueryConfig DUMMY = new QueryConfig();

@Override
public StopWatch register(Query query, ListenableFuture future, Closeable resource)
{
@@ -89,5 +93,11 @@ public void finished(String queryId) {}

@Override
public boolean isTimedOut(String queryId) { return false;}

@Override
public QueryConfig getQueryConfig()
{
return DUMMY;
}
}
}
Loading

0 comments on commit 434d405

Please sign in to comment.