Skip to content

Commit

Permalink
Add copy constructor to SearchRequest (#36641)
Browse files Browse the repository at this point in the history
For cross cluster search alternate execution mode (see #32125), we will need to take a search request that spans across multiple clusters (based on index prefixes e.g. cluster1:index, cluster2:index etc.) and split it into multiple search requests to be sent to each cluster. A copy constructor added to `SearchRequest` would make that easy and well maintainable in the future.

Something along the same lines already happens in `BulkByScrollParallelizationHelper`, but the corresponding code went outdated as some new fields were added to `SearchRequest` which were not added to the bulk by scroll code. A copy constructor helps making the task of copying a search request maintainable over time.
  • Loading branch information
javanna committed Dec 18, 2018
1 parent e939f3d commit fa0c774
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,19 +154,9 @@ static SearchRequest[] sliceIntoSubRequests(SearchRequest request, String field,
}
slicedSource = request.source().copyWithNewSlice(sliceBuilder);
}
slices[slice] = new SearchRequest()
.source(slicedSource)
.searchType(request.searchType())
.indices(request.indices())
.types(request.types())
.routing(request.routing())
.preference(request.preference())
.requestCache(request.requestCache())
.scroll(request.scroll())
.indicesOptions(request.indicesOptions());
if (request.allowPartialSearchResults() != null) {
slices[slice].allowPartialSearchResults(request.allowPartialSearchResults());
}
SearchRequest searchRequest = new SearchRequest(request);
searchRequest.source(slicedSource);
slices[slice] = searchRequest;
}
return slices;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
private Boolean requestCache;

private Boolean allowPartialSearchResults;



private Scroll scroll;

private int batchedReduceSize = DEFAULT_BATCHED_REDUCE_SIZE;
Expand All @@ -98,6 +97,25 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
public SearchRequest() {
}

/**
* Constructs a new search request from the provided search request
*/
public SearchRequest(SearchRequest searchRequest) {
this.allowPartialSearchResults = searchRequest.allowPartialSearchResults;
this.batchedReduceSize = searchRequest.batchedReduceSize;
this.indices = searchRequest.indices;
this.indicesOptions = searchRequest.indicesOptions;
this.maxConcurrentShardRequests = searchRequest.maxConcurrentShardRequests;
this.preference = searchRequest.preference;
this.preFilterShardSize = searchRequest.preFilterShardSize;
this.requestCache = searchRequest.requestCache;
this.routing = searchRequest.routing;
this.scroll = searchRequest.scroll;
this.searchType = searchRequest.searchType;
this.source = searchRequest.source;
this.types = searchRequest.types;
}

/**
* Constructs a new search request against the indices. No indices provided here means that search
* will run against all indices.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,17 @@

package org.elasticsearch.search;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.ArrayUtils;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.rescore.QueryRescorerBuilder;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -42,16 +41,10 @@ public class SearchRequestTests extends AbstractSearchTestCase {

public void testSerialization() throws Exception {
SearchRequest searchRequest = createSearchRequest();
try (BytesStreamOutput output = new BytesStreamOutput()) {
searchRequest.writeTo(output);
try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), namedWriteableRegistry)) {
SearchRequest deserializedRequest = new SearchRequest();
deserializedRequest.readFrom(in);
assertEquals(deserializedRequest, searchRequest);
assertEquals(deserializedRequest.hashCode(), searchRequest.hashCode());
assertNotSame(deserializedRequest, searchRequest);
}
}
SearchRequest deserializedRequest = copyStreamable(searchRequest, namedWriteableRegistry, SearchRequest::new, Version.CURRENT);
assertEquals(deserializedRequest, searchRequest);
assertEquals(deserializedRequest.hashCode(), searchRequest.hashCode());
assertNotSame(deserializedRequest, searchRequest);
}

public void testIllegalArguments() {
Expand Down Expand Up @@ -139,11 +132,19 @@ public void testValidate() throws IOException {

}

public void testCopyConstructor() throws IOException {
SearchRequest searchRequest = createSearchRequest();
SearchRequest deserializedRequest = copyStreamable(searchRequest, namedWriteableRegistry, SearchRequest::new, Version.CURRENT);
assertEquals(deserializedRequest, searchRequest);
assertEquals(deserializedRequest.hashCode(), searchRequest.hashCode());
assertNotSame(deserializedRequest, searchRequest);
}

public void testEqualsAndHashcode() throws IOException {
checkEqualsAndHashCode(createSearchRequest(), SearchRequestTests::copyRequest, this::mutate);
}

private SearchRequest mutate(SearchRequest searchRequest) throws IOException {
private SearchRequest mutate(SearchRequest searchRequest) {
SearchRequest mutation = copyRequest(searchRequest);
List<Runnable> mutators = new ArrayList<>();
mutators.add(() -> mutation.indices(ArrayUtils.concat(searchRequest.indices(), new String[] { randomAlphaOfLength(10) })));
Expand All @@ -152,7 +153,7 @@ private SearchRequest mutate(SearchRequest searchRequest) throws IOException {
mutators.add(() -> mutation.types(ArrayUtils.concat(searchRequest.types(), new String[] { randomAlphaOfLength(10) })));
mutators.add(() -> mutation.preference(randomValueOtherThan(searchRequest.preference(), () -> randomAlphaOfLengthBetween(3, 10))));
mutators.add(() -> mutation.routing(randomValueOtherThan(searchRequest.routing(), () -> randomAlphaOfLengthBetween(3, 10))));
mutators.add(() -> mutation.requestCache((randomValueOtherThan(searchRequest.requestCache(), () -> randomBoolean()))));
mutators.add(() -> mutation.requestCache((randomValueOtherThan(searchRequest.requestCache(), ESTestCase::randomBoolean))));
mutators.add(() -> mutation
.scroll(randomValueOtherThan(searchRequest.scroll(), () -> new Scroll(new TimeValue(randomNonNegativeLong() % 100000)))));
mutators.add(() -> mutation.searchType(randomValueOtherThan(searchRequest.searchType(),
Expand All @@ -162,20 +163,7 @@ private SearchRequest mutate(SearchRequest searchRequest) throws IOException {
return mutation;
}

private static SearchRequest copyRequest(SearchRequest searchRequest) throws IOException {
SearchRequest result = new SearchRequest();
result.indices(searchRequest.indices());
result.indicesOptions(searchRequest.indicesOptions());
result.types(searchRequest.types());
result.searchType(searchRequest.searchType());
result.preference(searchRequest.preference());
result.routing(searchRequest.routing());
result.requestCache(searchRequest.requestCache());
result.allowPartialSearchResults(searchRequest.allowPartialSearchResults());
result.scroll(searchRequest.scroll());
if (searchRequest.source() != null) {
result.source(searchRequest.source());
}
return result;
private static SearchRequest copyRequest(SearchRequest searchRequest) {
return new SearchRequest(searchRequest);
}
}

0 comments on commit fa0c774

Please sign in to comment.