Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add finalReduce flag to SearchRequest (6.x) #38180

Merged
merged 2 commits into from
Feb 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -712,19 +712,19 @@ InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> newSearchPhaseResu
final boolean hasAggs = source != null && source.aggregations() != null;
final boolean hasTopDocs = source == null || source.size() != 0;
final boolean trackTotalHits = source == null || source.trackTotalHits();
final boolean finalReduce = request.getLocalClusterAlias() == null;

if (isScrollRequest == false && (hasAggs || hasTopDocs)) {
// no incremental reduce if scroll is used - we only hit a single shard or sometimes more...
if (request.getBatchedReduceSize() < numShards) {
// only use this if there are aggs and if there are more shards than we should reduce at once
return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs, finalReduce);
return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs,
request.isFinalReduce());
}
}
return new InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult>(numShards) {
@Override
ReducedQueryPhase reduce() {
return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHits, finalReduce);
return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHits, request.isFinalReduce());
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest

private String localClusterAlias;
private long absoluteStartMillis;
private boolean finalReduce;

private SearchType searchType = SearchType.DEFAULT;

Expand Down Expand Up @@ -102,6 +103,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
public SearchRequest() {
this.localClusterAlias = null;
this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
this.finalReduce = true;
}

/**
Expand All @@ -123,6 +125,7 @@ public SearchRequest(SearchRequest searchRequest) {
this.types = searchRequest.types;
this.localClusterAlias = searchRequest.localClusterAlias;
this.absoluteStartMillis = searchRequest.absoluteStartMillis;
this.finalReduce = searchRequest.finalReduce;
}

/**
Expand All @@ -147,16 +150,18 @@ public SearchRequest(String[] indices, SearchSourceBuilder source) {

/**
* Creates a new search request by providing the alias of the cluster where it will be executed, as well as the current time in
* milliseconds from the epoch time. Used when a {@link SearchRequest} is created and executed as part of a cross-cluster search
* request performing local reduction on each cluster. The coordinating CCS node provides the alias to prefix index names with in
* the returned search results, and the current time to be used on the remote clusters to ensure that the same value is used.
* milliseconds from the epoch time and whether the reduction should be final or not. Used when a {@link SearchRequest} is created
* and executed as part of a cross-cluster search request performing reduction on each remote cluster. The coordinating CCS node
* provides the alias to prefix index names with in the returned search results, the current time to be used on the remote clusters
* to ensure that the same value is used, and determines whether the reduction phase should be final or not.
*/
SearchRequest(String localClusterAlias, long absoluteStartMillis) {
SearchRequest(String localClusterAlias, long absoluteStartMillis, boolean finalReduce) {
this.localClusterAlias = Objects.requireNonNull(localClusterAlias, "cluster alias must not be null");
if (absoluteStartMillis < 0) {
throw new IllegalArgumentException("absoluteStartMillis must not be negative but was [" + absoluteStartMillis + "]");
}
this.absoluteStartMillis = absoluteStartMillis;
this.finalReduce = finalReduce;
}

@Override
Expand Down Expand Up @@ -195,10 +200,17 @@ String getLocalClusterAlias() {
return localClusterAlias;
}

/**
* Returns whether the reduction phase that will be performed needs to be final or not.
*/
boolean isFinalReduce() {
return finalReduce;
}

/**
* Returns the current time in milliseconds from the time epoch, to be used for the execution of this search request. Used to
* ensure that the same value, determined by the coordinating node, is used on all nodes involved in the execution of the search
* request. When created through {@link #SearchRequest(String, long)}, this method returns the provided current time, otherwise
* request. When created through {@link #SearchRequest(String, long, boolean)}, this method returns the provided current time, otherwise
* it will return {@link System#currentTimeMillis()}.
*
*/
Expand Down Expand Up @@ -518,12 +530,15 @@ public void readFrom(StreamInput in) throws IOException {
localClusterAlias = in.readOptionalString();
if (localClusterAlias != null) {
absoluteStartMillis = in.readVLong();
finalReduce = in.readBoolean();
} else {
absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
finalReduce = true;
}
} else {
localClusterAlias = null;
absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
finalReduce = true;
}
}

Expand Down Expand Up @@ -554,6 +569,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(localClusterAlias);
if (localClusterAlias != null) {
out.writeVLong(absoluteStartMillis);
out.writeBoolean(finalReduce);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,13 @@ private static AtomicArray<SearchPhaseResult> generateFetchResults(int nShards,
return fetchResults;
}

private static SearchRequest randomSearchRequest() {
return randomBoolean() ? new SearchRequest() : new SearchRequest("remote", 0, randomBoolean());
}

public void testConsumer() {
int bufferSize = randomIntBetween(2, 3);
SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote");
SearchRequest request = randomSearchRequest();
request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")));
request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> consumer = searchPhaseController.newSearchPhaseResults(request, 3);
Expand Down Expand Up @@ -366,7 +370,7 @@ public void testConsumerConcurrently() throws InterruptedException {
int expectedNumResults = randomIntBetween(1, 100);
int bufferSize = randomIntBetween(2, 200);

SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote");
SearchRequest request = randomSearchRequest();
request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")));
request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> consumer =
Expand Down Expand Up @@ -410,7 +414,7 @@ public void testConsumerConcurrently() throws InterruptedException {
public void testConsumerOnlyAggs() {
int expectedNumResults = randomIntBetween(1, 100);
int bufferSize = randomIntBetween(2, 200);
SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote");
SearchRequest request = randomSearchRequest();
request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")).size(0));
request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> consumer =
Expand Down Expand Up @@ -444,7 +448,7 @@ public void testConsumerOnlyAggs() {
public void testConsumerOnlyHits() {
int expectedNumResults = randomIntBetween(1, 100);
int bufferSize = randomIntBetween(2, 200);
SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote");
SearchRequest request = randomSearchRequest();
if (randomBoolean()) {
request.source(new SearchSourceBuilder().size(randomIntBetween(1, 10)));
}
Expand Down Expand Up @@ -475,8 +479,7 @@ public void testConsumerOnlyHits() {

private void assertFinalReduction(SearchRequest searchRequest) {
assertThat(reductions.size(), greaterThanOrEqualTo(1));
//the last reduction step was the final one only if no cluster alias was provided with the search request
assertEquals(searchRequest.getLocalClusterAlias() == null, reductions.get(reductions.size() - 1));
assertEquals(searchRequest.isFinalReduce(), reductions.get(reductions.size() - 1));
}

public void testNewSearchPhaseResults() {
Expand Down Expand Up @@ -548,7 +551,7 @@ public void testReduceTopNWithFromOffset() {
public void testConsumerSortByField() {
int expectedNumResults = randomIntBetween(1, 100);
int bufferSize = randomIntBetween(2, 200);
SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote");
SearchRequest request = randomSearchRequest();
int size = randomIntBetween(1, 10);
request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> consumer =
Expand Down Expand Up @@ -583,7 +586,7 @@ public void testConsumerSortByField() {
public void testConsumerFieldCollapsing() {
int expectedNumResults = randomIntBetween(30, 100);
int bufferSize = randomIntBetween(2, 200);
SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote");
SearchRequest request = randomSearchRequest();
int size = randomIntBetween(5, 10);
request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> consumer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ protected SearchRequest createSearchRequest() throws IOException {
return super.createSearchRequest();
}
//clusterAlias and absoluteStartMillis do not have public getters/setters hence we randomize them only in this test specifically.
SearchRequest searchRequest = new SearchRequest(randomAlphaOfLengthBetween(5, 10), randomNonNegativeLong());
SearchRequest searchRequest = new SearchRequest(randomAlphaOfLengthBetween(5, 10), randomNonNegativeLong(), randomBoolean());
RandomSearchRequestGenerator.randomSearchRequest(searchRequest, this::createSearchSourceBuilder);
return searchRequest;
}

public void testClusterAliasValidation() {
expectThrows(NullPointerException.class, () -> new SearchRequest(null, 0));
expectThrows(IllegalArgumentException.class, () -> new SearchRequest("", -1));
SearchRequest searchRequest = new SearchRequest("", 0);
expectThrows(NullPointerException.class, () -> new SearchRequest(null, 0, randomBoolean()));
expectThrows(IllegalArgumentException.class, () -> new SearchRequest("", -1, randomBoolean()));
SearchRequest searchRequest = new SearchRequest("", 0, randomBoolean());
assertNull(searchRequest.validate());
}

Expand All @@ -79,9 +79,11 @@ public void testClusterAliasSerialization() throws IOException {
if (version.before(Version.V_6_7_0)) {
assertNull(deserializedRequest.getLocalClusterAlias());
assertAbsoluteStartMillisIsCurrentTime(deserializedRequest);
assertTrue(deserializedRequest.isFinalReduce());
} else {
assertEquals(searchRequest.getLocalClusterAlias(), deserializedRequest.getLocalClusterAlias());
assertEquals(searchRequest.getOrCreateAbsoluteStartMillis(), deserializedRequest.getOrCreateAbsoluteStartMillis());
assertEquals(searchRequest.isFinalReduce(), deserializedRequest.isFinalReduce());
}
}

Expand All @@ -94,6 +96,7 @@ public void testReadFromPre6_7_0() throws IOException {
assertArrayEquals(new String[]{"index"}, searchRequest.indices());
assertNull(searchRequest.getLocalClusterAlias());
assertAbsoluteStartMillisIsCurrentTime(searchRequest);
assertTrue(searchRequest.isFinalReduce());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,25 @@
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.LongTerms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESSingleNodeTestCase;

public class TransportSearchActionSingleNodeTests extends ESSingleNodeTestCase {

public void testLocalClusterAlias() {
long nowInMillis = System.currentTimeMillis();
long nowInMillis = randomLongBetween(0, Long.MAX_VALUE);
IndexRequest indexRequest = new IndexRequest("test", "type", "1");
indexRequest.source("field", "value");
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
IndexResponse indexResponse = client().index(indexRequest).actionGet();
assertEquals(RestStatus.CREATED, indexResponse.status());

{
SearchRequest searchRequest = new SearchRequest("local", nowInMillis);
SearchRequest searchRequest = new SearchRequest("local", nowInMillis, randomBoolean());
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(1, searchResponse.getHits().getTotalHits());
SearchHit[] hits = searchResponse.getHits().getHits();
Expand All @@ -51,7 +55,7 @@ public void testLocalClusterAlias() {
assertEquals("1", hit.getId());
}
{
SearchRequest searchRequest = new SearchRequest("", nowInMillis);
SearchRequest searchRequest = new SearchRequest("", nowInMillis, randomBoolean());
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(1, searchResponse.getHits().getTotalHits());
SearchHit[] hits = searchResponse.getHits().getHits();
Expand Down Expand Up @@ -90,19 +94,19 @@ public void testAbsoluteStartMillis() {
assertEquals(0, searchResponse.getTotalShards());
}
{
SearchRequest searchRequest = new SearchRequest("", 0);
SearchRequest searchRequest = new SearchRequest("", 0, randomBoolean());
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(2, searchResponse.getHits().getTotalHits());
}
{
SearchRequest searchRequest = new SearchRequest("", 0);
SearchRequest searchRequest = new SearchRequest("", 0, randomBoolean());
searchRequest.indices("<test-{now/d}>");
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(1, searchResponse.getHits().getTotalHits());
assertEquals("test-1970.01.01", searchResponse.getHits().getHits()[0].getIndex());
}
{
SearchRequest searchRequest = new SearchRequest("", 0);
SearchRequest searchRequest = new SearchRequest("", 0, randomBoolean());
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
RangeQueryBuilder rangeQuery = new RangeQueryBuilder("date");
rangeQuery.gte("1970-01-01");
Expand All @@ -114,4 +118,46 @@ public void testAbsoluteStartMillis() {
assertEquals("test-1970.01.01", searchResponse.getHits().getHits()[0].getIndex());
}
}

public void testFinalReduce() {
long nowInMillis = randomLongBetween(0, Long.MAX_VALUE);
{
IndexRequest indexRequest = new IndexRequest("test", "type", "1");
indexRequest.source("price", 10);
IndexResponse indexResponse = client().index(indexRequest).actionGet();
assertEquals(RestStatus.CREATED, indexResponse.status());
}
{
IndexRequest indexRequest = new IndexRequest("test", "type", "2");
indexRequest.source("price", 100);
IndexResponse indexResponse = client().index(indexRequest).actionGet();
assertEquals(RestStatus.CREATED, indexResponse.status());
}
client().admin().indices().prepareRefresh("test").get();

SearchSourceBuilder source = new SearchSourceBuilder();
source.size(0);
TermsAggregationBuilder terms = new TermsAggregationBuilder("terms", ValueType.NUMERIC);
terms.field("price");
terms.size(1);
source.aggregation(terms);

{
SearchRequest searchRequest = randomBoolean() ? new SearchRequest().source(source)
: new SearchRequest("remote", nowInMillis, true).source(source);
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(2, searchResponse.getHits().getTotalHits());
Aggregations aggregations = searchResponse.getAggregations();
LongTerms longTerms = aggregations.get("terms");
assertEquals(1, longTerms.getBuckets().size());
}
{
SearchRequest searchRequest = new SearchRequest("remote", nowInMillis, false).source(source);
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(2, searchResponse.getHits().getTotalHits());
Aggregations aggregations = searchResponse.getAggregations();
LongTerms longTerms = aggregations.get("terms");
assertEquals(2, longTerms.getBuckets().size());
}
}
}