Skip to content

Commit

Permalink
Add support for merging multiple search responses into one (#37566)
Browse files Browse the repository at this point in the history
This will be used in cross-cluster search when reduction will be
performed locally on each cluster. The CCS coordinating node will send
one search request per remote cluster involved and will get one search
response back from each one of them. Such responses contain all the info
to be able to perform an additional reduction and return results back
to the user.

Relates to #32125
  • Loading branch information
javanna authored Jan 21, 2019
1 parent 14d74eb commit 09a6ba5
Show file tree
Hide file tree
Showing 8 changed files with 899 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.search.TransportSearchAction.SearchTimeProvider;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.Nullable;
Expand All @@ -43,7 +44,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
Expand All @@ -70,7 +70,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final Object shardFailuresMutex = new Object();
private final AtomicInteger successfulOps = new AtomicInteger();
private final AtomicInteger skippedOps = new AtomicInteger();
private final TransportSearchAction.SearchTimeProvider timeProvider;
private final SearchTimeProvider timeProvider;
private final SearchResponse.Clusters clusters;

AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService,
Expand All @@ -79,7 +79,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
Map<String, Set<String>> indexRoutings,
Executor executor, SearchRequest request,
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion,
SearchTimeProvider timeProvider, long clusterStateVersion,
SearchTask task, SearchPhaseResults<Result> resultConsumer, int maxConcurrentRequestsPerNode,
SearchResponse.Clusters clusters) {
super(name, request, shardsIts, logger, maxConcurrentRequestsPerNode, executor);
Expand All @@ -103,8 +103,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
* Builds how long it took to execute the search.
*/
long buildTookInMillis() {
return TimeUnit.NANOSECONDS.toMillis(
timeProvider.getRelativeCurrentNanos() - timeProvider.getRelativeStartNanos());
return timeProvider.buildTookInMillis();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ static SortedTopDocs sortDocs(boolean ignoreFrom, Collection<? extends SearchPha
if (queryResult.hasConsumedTopDocs() == false) { // already consumed?
final TopDocsAndMaxScore td = queryResult.consumeTopDocs();
assert td != null;
topDocsStats.add(td);
topDocsStats.add(td, queryResult.searchTimedOut(), queryResult.terminatedEarly());
// make sure we set the shard index before we add it - the consumer didn't do that yet
if (td.topDocs.scoreDocs.length > 0) {
setShardIndex(td.topDocs, queryResult.getShardIndex());
Expand Down Expand Up @@ -439,12 +439,10 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
boolean performFinalReduce) {
assert numReducePhases >= 0 : "num reduce phases must be >= 0 but was: " + numReducePhases;
numReducePhases++; // increment for this phase
boolean timedOut = false;
Boolean terminatedEarly = null;
if (queryResults.isEmpty()) { // early terminate we have nothing to reduce
final TotalHits totalHits = topDocsStats.getTotalHits();
return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.maxScore,
timedOut, terminatedEarly, null, null, null, SortedTopDocs.EMPTY, null, numReducePhases, 0, 0, true);
return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.getMaxScore(),
false, null, null, null, null, SortedTopDocs.EMPTY, null, numReducePhases, 0, 0, true);
}
final QuerySearchResult firstResult = queryResults.stream().findFirst().get().queryResult();
final boolean hasSuggest = firstResult.suggest() != null;
Expand Down Expand Up @@ -476,16 +474,6 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
QuerySearchResult result = entry.queryResult();
from = result.from();
size = result.size();
if (result.searchTimedOut()) {
timedOut = true;
}
if (result.terminatedEarly() != null) {
if (terminatedEarly == null) {
terminatedEarly = result.terminatedEarly();
} else if (result.terminatedEarly()) {
terminatedEarly = true;
}
}
if (hasSuggest) {
assert result.suggest() != null;
for (Suggestion<? extends Suggestion.Entry<? extends Suggestion.Entry.Option>> suggestion : result.suggest()) {
Expand All @@ -508,8 +496,8 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
final SortedTopDocs sortedTopDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size);
final TotalHits totalHits = topDocsStats.getTotalHits();
return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.maxScore,
timedOut, terminatedEarly, suggest, aggregations, shardResults, sortedTopDocs,
return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.getMaxScore(),
topDocsStats.timedOut, topDocsStats.terminatedEarly, suggest, aggregations, shardResults, sortedTopDocs,
firstResult.sortValueFormats(), numReducePhases, size, from, false);
}

Expand Down Expand Up @@ -577,11 +565,7 @@ public static final class ReducedQueryPhase {
}
this.totalHits = totalHits;
this.fetchHits = fetchHits;
if (Float.isInfinite(maxScore)) {
this.maxScore = Float.NaN;
} else {
this.maxScore = maxScore;
}
this.maxScore = maxScore;
this.timedOut = timedOut;
this.terminatedEarly = terminatedEarly;
this.suggest = suggest;
Expand Down Expand Up @@ -682,7 +666,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
}
if (hasTopDocs) {
final TopDocsAndMaxScore topDocs = querySearchResult.consumeTopDocs(); // can't be null
topDocsStats.add(topDocs);
topDocsStats.add(topDocs, querySearchResult.searchTimedOut(), querySearchResult.terminatedEarly());
setShardIndex(topDocs.topDocs, querySearchResult.getShardIndex());
topDocsBuffer[i] = topDocs.topDocs;
}
Expand Down Expand Up @@ -744,18 +728,20 @@ static final class TopDocsStats {
private long totalHits;
private TotalHits.Relation totalHitsRelation;
long fetchHits;
float maxScore = Float.NEGATIVE_INFINITY;

TopDocsStats() {
this(SearchContext.TRACK_TOTAL_HITS_ACCURATE);
}
private float maxScore = Float.NEGATIVE_INFINITY;
boolean timedOut;
Boolean terminatedEarly;

TopDocsStats(int trackTotalHitsUpTo) {
this.trackTotalHitsUpTo = trackTotalHitsUpTo;
this.totalHits = 0;
this.totalHitsRelation = Relation.EQUAL_TO;
}

float getMaxScore() {
return Float.isInfinite(maxScore) ? Float.NaN : maxScore;
}

TotalHits getTotalHits() {
if (trackTotalHitsUpTo == SearchContext.TRACK_TOTAL_HITS_DISABLED) {
return null;
Expand All @@ -766,7 +752,7 @@ TotalHits getTotalHits() {
if (totalHits < trackTotalHitsUpTo) {
return new TotalHits(totalHits, totalHitsRelation);
} else {
/**
/*
* The user requested to count the total hits up to <code>trackTotalHitsUpTo</code>
* so we return this lower bound when the total hits is greater than this value.
* This can happen when multiple shards are merged since the limit to track total hits
Expand All @@ -777,7 +763,7 @@ TotalHits getTotalHits() {
}
}

void add(TopDocsAndMaxScore topDocs) {
void add(TopDocsAndMaxScore topDocs, boolean timedOut, Boolean terminatedEarly) {
if (trackTotalHitsUpTo != SearchContext.TRACK_TOTAL_HITS_DISABLED) {
totalHits += topDocs.topDocs.totalHits.value;
if (topDocs.topDocs.totalHits.relation == Relation.GREATER_THAN_OR_EQUAL_TO) {
Expand All @@ -788,6 +774,16 @@ void add(TopDocsAndMaxScore topDocs) {
if (!Float.isNaN(topDocs.maxScore)) {
maxScore = Math.max(maxScore, topDocs.maxScore);
}
if (timedOut) {
this.timedOut = true;
}
if (terminatedEarly != null) {
if (this.terminatedEarly == null) {
this.terminatedEarly = terminatedEarly;
} else if (terminatedEarly) {
this.terminatedEarly = true;
}
}
}
}

Expand Down
Loading

0 comments on commit 09a6ba5

Please sign in to comment.