Skip to content

Commit

Permalink
CCS: skip empty search hits when minimizing round-trips (elastic#40098)
Browse files Browse the repository at this point in the history
When minimizing round-trips, each cluster returns its own independent
search response. In case sort by field and/or field collapsing were
requested, when one cluster has no results to return, the information
about the field that sorting was based on (SortField array) as well as
the field (and the values) that collapsing was performed on are missing
in the search response. That causes problems as we can't build the
proper `TopDocs` instance which would need to be either `TopFieldDocs`
or `CollapseTopFieldDocs`. The merge routine expects that all the top
docs are of the same exact type which can't be guaranteed. Given that
the problematic results are empty, hence have no impact on the final
results, we can simply skip them.

Relates to elastic#32125
Closes elastic#40067
  • Loading branch information
javanna committed Mar 19, 2019
1 parent f0e15f7 commit 3b67c1a
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,17 +178,23 @@ SearchResponse getMergedResponse(Clusters clusters) {
assert trackTotalHits == null || trackTotalHits;
trackTotalHits = true;
}

TopDocs topDocs = searchHitsToTopDocs(searchHits, totalHits, shards);
topDocsStats.add(new TopDocsAndMaxScore(topDocs, searchHits.getMaxScore()),
searchResponse.isTimedOut(), searchResponse.isTerminatedEarly());
topDocsList.add(topDocs);
if (searchHits.getHits().length > 0) {
//there is no point in adding empty search hits and merging them with the others. Also, empty search hits always come
//without sort fields and collapse info, despite sort by field and/or field collapsing was requested, which causes
//issues reconstructing the proper TopDocs instance and breaks mergeTopDocs which expects the same type for each result.
topDocsList.add(topDocs);
}
}

//after going through all the hits and collecting all their distinct shards, we can assign shardIndex and set it to the ScoreDocs
//after going through all the hits and collecting all their distinct shards, we assign shardIndex and set it to the ScoreDocs
setTopDocsShardIndex(shards, topDocsList);
setSuggestShardIndex(shards, groupedSuggestions);
TopDocs topDocs = mergeTopDocs(topDocsList, size, from);
SearchHits mergedSearchHits = topDocsToSearchHits(topDocs, topDocsStats);
setSuggestShardIndex(shards, groupedSuggestions);
Suggest suggest = groupedSuggestions.isEmpty() ? null : new Suggest(Suggest.reduce(groupedSuggestions));
InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, reduceContextFunction.apply(true));
ShardSearchFailure[] shardFailures = failures.toArray(ShardSearchFailure.EMPTY_ARRAY);
Expand Down Expand Up @@ -330,12 +336,17 @@ private static void assignShardIndex(Map<ShardIdAndClusterAlias, Integer> shards
}

private static SearchHits topDocsToSearchHits(TopDocs topDocs, TopDocsStats topDocsStats) {
SearchHit[] searchHits = new SearchHit[topDocs.scoreDocs.length];
for (int i = 0; i < topDocs.scoreDocs.length; i++) {
FieldDocAndSearchHit scoreDoc = (FieldDocAndSearchHit)topDocs.scoreDocs[i];
searchHits[i] = scoreDoc.searchHit;
SearchHit[] searchHits;
if (topDocs == null) {
//merged TopDocs is null whenever all clusters have returned empty hits
searchHits = new SearchHit[0];
} else {
searchHits = new SearchHit[topDocs.scoreDocs.length];
for (int i = 0; i < topDocs.scoreDocs.length; i++) {
FieldDocAndSearchHit scoreDoc = (FieldDocAndSearchHit)topDocs.scoreDocs[i];
searchHits[i] = scoreDoc.searchHit;
}
}

SortField[] sortFields = null;
String collapseField = null;
Object[] collapseValues = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,72 @@ public void testMergeNoResponsesAdded() {
assertEquals(0, response.getShardFailures().length);
}

public void testMergeEmptySearchHitsWithNonEmpty() {
long currentRelativeTime = randomLong();
final SearchTimeProvider timeProvider = new SearchTimeProvider(randomLong(), 0, () -> currentRelativeTime);
SearchResponseMerger merger = new SearchResponseMerger(0, 10, Integer.MAX_VALUE, timeProvider, flag -> null);
SearchResponse.Clusters clusters = SearchResponseTests.randomClusters();
int numFields = randomIntBetween(1, 3);
SortField[] sortFields = new SortField[numFields];
for (int i = 0; i < numFields; i++) {
sortFields[i] = new SortField("field-" + i, SortField.Type.INT, randomBoolean());
}
PriorityQueue<SearchHit> priorityQueue = new PriorityQueue<>(new SearchHitComparator(sortFields));
SearchHit[] hits = randomSearchHitArray(10, 1, "remote", new Index[]{new Index("index", "uuid")}, Float.NaN, 1,
sortFields, priorityQueue);
{
SearchHits searchHits = new SearchHits(hits, new TotalHits(10, TotalHits.Relation.EQUAL_TO), Float.NaN, sortFields, null, null);
InternalSearchResponse response = new InternalSearchResponse(searchHits, null, null, null, false, false, 1);
SearchResponse searchResponse = new SearchResponse(response, null, 1, 1, 0, 1L,
ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY);
merger.add(searchResponse);
}
{
SearchHits empty = new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN, null, null, null);
InternalSearchResponse response = new InternalSearchResponse(empty, null, null, null, false, false, 1);
SearchResponse searchResponse = new SearchResponse(response, null, 1, 1, 0, 1L,
ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY);
merger.add(searchResponse);
}
assertEquals(2, merger.numResponses());
SearchResponse mergedResponse = merger.getMergedResponse(clusters);
assertEquals(10, mergedResponse.getHits().getTotalHits().value);
assertEquals(10, mergedResponse.getHits().getHits().length);
assertEquals(2, mergedResponse.getTotalShards());
assertEquals(2, mergedResponse.getSuccessfulShards());
assertEquals(0, mergedResponse.getSkippedShards());
assertArrayEquals(sortFields, mergedResponse.getHits().getSortFields());
assertArrayEquals(hits, mergedResponse.getHits().getHits());
assertEquals(clusters, mergedResponse.getClusters());
}

public void testMergeOnlyEmptyHits() {
long currentRelativeTime = randomLong();
final SearchTimeProvider timeProvider = new SearchTimeProvider(randomLong(), 0, () -> currentRelativeTime);
SearchResponse.Clusters clusters = SearchResponseTests.randomClusters();
Tuple<Integer, TotalHits.Relation> randomTrackTotalHits = randomTrackTotalHits();
int trackTotalHitsUpTo = randomTrackTotalHits.v1();
TotalHits.Relation totalHitsRelation = randomTrackTotalHits.v2();
SearchResponseMerger merger = new SearchResponseMerger(0, 10, trackTotalHitsUpTo, timeProvider, flag -> null);
int numResponses = randomIntBetween(1, 5);
TotalHits expectedTotalHits = null;
for (int i = 0; i < numResponses; i++) {
TotalHits totalHits = null;
if (trackTotalHitsUpTo != SearchContext.TRACK_TOTAL_HITS_DISABLED) {
totalHits = new TotalHits(randomLongBetween(0, 1000), totalHitsRelation);
long previousValue = expectedTotalHits == null ? 0 : expectedTotalHits.value;
expectedTotalHits = new TotalHits(Math.min(previousValue + totalHits.value, trackTotalHitsUpTo), totalHitsRelation);
}
SearchHits empty = new SearchHits(new SearchHit[0], totalHits, Float.NaN, null, null, null);
InternalSearchResponse response = new InternalSearchResponse(empty, null, null, null, false, false, 1);
SearchResponse searchResponse = new SearchResponse(response, null, 1, 1, 0, 1L,
ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY);
merger.add(searchResponse);
}
SearchResponse mergedResponse = merger.getMergedResponse(clusters);
assertEquals(expectedTotalHits, mergedResponse.getHits().getTotalHits());
}

private static Tuple<Integer, TotalHits.Relation> randomTrackTotalHits() {
switch(randomIntBetween(0, 2)) {
case 0:
Expand Down

0 comments on commit 3b67c1a

Please sign in to comment.