diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java index 1286e083d8203..a30fec41b0bf3 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java @@ -399,6 +399,7 @@ private static void addSearchRequestParams(Params params, SearchRequest searchRe params.withPreference(searchRequest.preference()); params.withIndicesOptions(searchRequest.indicesOptions()); params.putParam("search_type", searchRequest.searchType().name().toLowerCase(Locale.ROOT)); + params.putParam("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips())); if (searchRequest.requestCache() != null) { params.putParam("request_cache", Boolean.toString(searchRequest.requestCache())); } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java index f31c562332687..95971ad40ced0 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java @@ -1239,7 +1239,7 @@ public void testMultiSearch() throws IOException { requests.add(searchRequest); }; MultiSearchRequest.readMultiLineFormat(new BytesArray(EntityUtils.toByteArray(request.getEntity())), - REQUEST_BODY_CONTENT_TYPE.xContent(), consumer, null, multiSearchRequest.indicesOptions(), null, null, null, + REQUEST_BODY_CONTENT_TYPE.xContent(), consumer, null, multiSearchRequest.indicesOptions(), null, null, null, null, xContentRegistry(), true); assertEquals(requests, multiSearchRequest.requests()); } @@ -1862,6 +1862,10 @@ private static void setRandomSearchParams(SearchRequest searchRequest, searchRequest.scroll(randomTimeValue()); expectedParams.put("scroll", searchRequest.scroll().keepAlive().getStringRep()); } + if (randomBoolean()) { + searchRequest.setCcsMinimizeRoundtrips(randomBoolean()); + } + expectedParams.put("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips())); } static void setRandomIndicesOptions(Consumer setter, Supplier getter, diff --git a/docs/reference/modules/cross-cluster-search.asciidoc b/docs/reference/modules/cross-cluster-search.asciidoc index 61b0bb50aedc7..b59f74198c3e8 100644 --- a/docs/reference/modules/cross-cluster-search.asciidoc +++ b/docs/reference/modules/cross-cluster-search.asciidoc @@ -65,6 +65,7 @@ GET /cluster_one:twitter/_search { "took": 150, "timed_out": false, + "num_reduce_phases": 2, "_shards": { "total": 1, "successful": 1, @@ -130,6 +131,7 @@ will be prefixed with their remote cluster name: { "took": 150, "timed_out": false, + "num_reduce_phases": 3, "_shards": { "total": 2, "successful": 2, @@ -222,6 +224,7 @@ GET /cluster_one:twitter,cluster_two:twitter,twitter/_search <1> { "took": 150, "timed_out": false, + "num_reduce_phases": 3, "_shards": { "total": 2, "successful": 2, @@ -273,3 +276,43 @@ GET /cluster_one:twitter,cluster_two:twitter,twitter/_search <1> // TESTRESPONSE[s/"_score": 1/"_score": "$body.hits.hits.0._score"/] // TESTRESPONSE[s/"_score": 2/"_score": "$body.hits.hits.1._score"/] <1> The `clusters` section indicates that one cluster was unavailable and got skipped + +[float] +[[ccs-reduction]] +=== CCS reduction phase + +Cross-cluster search requests can be executed in two ways: + +- the CCS coordinating node minimizes network round-trips by sending one search +request to each cluster. Each cluster performs the search independently, +reducing and fetching results. Once the CCS node has received all the +responses, it performs another reduction and returns the relevant results back +to the user. This strategy is beneficial when there is network latency between +the CCS coordinating node and the remote clusters involved, which is typically +the case. A single request is sent to each remote cluster, at the cost of +retrieving `from` + `size` already fetched results. This is the default +strategy, used whenever possible. In case a scroll is provided, or inner hits +are requested as part of field collapsing, this strategy is not supported hence +network round-trips cannot be minimized and the following strategy is used +instead. + +- the CCS coordinating node sends a <> request to +each remote cluster, in order to collect information about their corresponding +remote indices involved in the search request and the shards where their data +is located. Once each cluster has responded to such request, the search +executes as if all shards were part of the same cluster. The coordinating node +sends one request to each shard involved, each shard executes the query and +returns its own results which are then reduced (and fetched, depending on the +<>) by the CCS coordinating node. +This strategy may be beneficial whenever there is very low network latency +between the CCS coordinating node and the remote clusters involved, as it +treats all shards the same, at the cost of sending many requests to each remote +cluster, which is problematic in presence of network latency. + +The <> supports the `ccs_minimize_roundtrips` +parameter, which defaults to `true` and can be set to `false` in case +minimizing network round-trips is not desirable. + +Note that all the communication between the nodes, regardless of which cluster +they belong to and the selected reduce mode, happens through the +<>. diff --git a/docs/reference/search/request-body.asciidoc b/docs/reference/search/request-body.asciidoc index dac7622aab8ed..120c4c6757599 100644 --- a/docs/reference/search/request-body.asciidoc +++ b/docs/reference/search/request-body.asciidoc @@ -113,6 +113,11 @@ And here is a sample response: reduce the memory overhead per search request if the potential number of shards in the request can be large. +`ccs_minimize_roundtrips`:: + + Defaults to `true`. Set to `false` to disable minimizing network round-trips + between the coordinating node and the remote clusters when executing + cross-cluster search requests. See <> for more. Out of the above, the `search_type`, `request_cache` and the `allow_partial_search_results` diff --git a/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/MultiSearchTemplateRequest.java b/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/MultiSearchTemplateRequest.java index a685c3ba5ba7c..c80f99484a947 100644 --- a/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/MultiSearchTemplateRequest.java +++ b/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/MultiSearchTemplateRequest.java @@ -65,7 +65,6 @@ public MultiSearchTemplateRequest add(SearchTemplateRequest request) { return this; } - /** * Returns the amount of search requests specified in this multi search requests are allowed to be ran concurrently. */ diff --git a/modules/lang-mustache/src/test/java/org/elasticsearch/script/mustache/MultiSearchTemplateResponseTests.java b/modules/lang-mustache/src/test/java/org/elasticsearch/script/mustache/MultiSearchTemplateResponseTests.java index 2c67dd4709bc9..dadaf7cb05a09 100644 --- a/modules/lang-mustache/src/test/java/org/elasticsearch/script/mustache/MultiSearchTemplateResponseTests.java +++ b/modules/lang-mustache/src/test/java/org/elasticsearch/script/mustache/MultiSearchTemplateResponseTests.java @@ -50,7 +50,7 @@ protected MultiSearchTemplateResponse createTestInstance() { int successfulShards = randomIntBetween(0, totalShards); int skippedShards = totalShards - successfulShards; InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty(); - SearchResponse.Clusters clusters = new SearchResponse.Clusters(totalShards, successfulShards, skippedShards); + SearchResponse.Clusters clusters = randomClusters(); SearchTemplateResponse searchTemplateResponse = new SearchTemplateResponse(); SearchResponse searchResponse = new SearchResponse(internalSearchResponse, null, totalShards, successfulShards, skippedShards, tookInMillis, ShardSearchFailure.EMPTY_ARRAY, clusters); @@ -59,7 +59,13 @@ protected MultiSearchTemplateResponse createTestInstance() { } return new MultiSearchTemplateResponse(items, overallTookInMillis); } - + + private static SearchResponse.Clusters randomClusters() { + int totalClusters = randomIntBetween(0, 10); + int successfulClusters = randomIntBetween(0, totalClusters); + int skippedClusters = totalClusters - successfulClusters; + return new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters); + } private static MultiSearchTemplateResponse createTestInstanceWithFailures() { int numItems = randomIntBetween(0, 128); @@ -67,14 +73,13 @@ private static MultiSearchTemplateResponse createTestInstanceWithFailures() { MultiSearchTemplateResponse.Item[] items = new MultiSearchTemplateResponse.Item[numItems]; for (int i = 0; i < numItems; i++) { if (randomBoolean()) { - // Creating a minimal response is OK, because SearchResponse self - // is tested elsewhere. + // Creating a minimal response is OK, because SearchResponse is tested elsewhere. long tookInMillis = randomNonNegativeLong(); int totalShards = randomIntBetween(1, Integer.MAX_VALUE); int successfulShards = randomIntBetween(0, totalShards); int skippedShards = totalShards - successfulShards; InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty(); - SearchResponse.Clusters clusters = new SearchResponse.Clusters(totalShards, successfulShards, skippedShards); + SearchResponse.Clusters clusters = randomClusters(); SearchTemplateResponse searchTemplateResponse = new SearchTemplateResponse(); SearchResponse searchResponse = new SearchResponse(internalSearchResponse, null, totalShards, successfulShards, skippedShards, tookInMillis, ShardSearchFailure.EMPTY_ARRAY, clusters); @@ -133,6 +138,5 @@ public void testFromXContentWithFailures() throws IOException { AbstractXContentTestCase.testFromXContent(NUMBER_OF_TEST_RUNS, instanceSupplier, supportsUnknownFields, Strings.EMPTY_ARRAY, getRandomFieldsExcludeFilterWhenResultHasErrors(), this::createParser, this::doParseInstance, this::assertEqualInstances, assertToXContentEquivalence, ToXContent.EMPTY_PARAMS); - } - + } } diff --git a/qa/ccs-unavailable-clusters/src/test/java/org/elasticsearch/search/CrossClusterSearchUnavailableClusterIT.java b/qa/ccs-unavailable-clusters/src/test/java/org/elasticsearch/search/CrossClusterSearchUnavailableClusterIT.java index efac9ac220573..e280b1d2d1a05 100644 --- a/qa/ccs-unavailable-clusters/src/test/java/org/elasticsearch/search/CrossClusterSearchUnavailableClusterIT.java +++ b/qa/ccs-unavailable-clusters/src/test/java/org/elasticsearch/search/CrossClusterSearchUnavailableClusterIT.java @@ -22,6 +22,7 @@ import org.apache.http.HttpEntity; import org.apache.http.entity.ContentType; import org.apache.http.nio.entity.NStringEntity; +import org.apache.lucene.search.TotalHits; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction; @@ -32,9 +33,11 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollRequest; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.client.Request; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Response; @@ -49,6 +52,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; @@ -107,6 +112,14 @@ private static MockTransportService startTransport( channel.sendResponse(new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0], knownNodes.toArray(new DiscoveryNode[0]), Collections.emptyMap())); }); + newService.registerRequestHandler(SearchAction.NAME, ThreadPool.Names.SAME, SearchRequest::new, + (request, channel, task) -> { + InternalSearchResponse response = new InternalSearchResponse(new SearchHits(new SearchHit[0], + new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN), InternalAggregations.EMPTY, null, null, false, null, 1); + SearchResponse searchResponse = new SearchResponse(response, null, 1, 1, 0, 100, ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY); + channel.sendResponse(searchResponse); + }); newService.registerRequestHandler(ClusterStateAction.NAME, ThreadPool.Names.SAME, ClusterStateRequest::new, (request, channel, task) -> { DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); diff --git a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml index 5acf84139bbf4..4499a60bfe24a 100644 --- a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml +++ b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml @@ -36,6 +36,10 @@ terms: field: f1.keyword + - match: {_clusters.total: 2} + - match: {_clusters.successful: 2} + - match: {_clusters.skipped: 0} + - match: {num} - match: { _shards.total: 5 } - match: { hits.total: 11 } - gte: { hits.hits.0._seq_no: 0 } @@ -59,6 +63,9 @@ terms: field: f1.keyword + - match: {_clusters.total: 2} + - match: {_clusters.successful: 2} + - match: {_clusters.skipped: 0} - match: { _shards.total: 5 } - match: { hits.total: 6} - match: { hits.hits.0._index: "my_remote_cluster:test_index"} @@ -76,6 +83,9 @@ terms: field: f1.keyword + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} - match: { _shards.total: 3 } - match: { hits.total: 6} - match: { hits.hits.0._index: "my_remote_cluster:test_index"} @@ -93,6 +103,7 @@ terms: field: f1.keyword + - is_false: _clusters - match: { _shards.total: 2 } - match: { hits.total: 5} - match: { hits.hits.0._index: "test_index"} @@ -122,6 +133,9 @@ rest_total_hits_as_int: true index: test_remote_cluster:test_index + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} - match: { _shards.total: 3 } - match: { hits.total: 6 } - match: { hits.hits.0._index: "test_remote_cluster:test_index" } @@ -148,6 +162,9 @@ rest_total_hits_as_int: true index: "*:test_index" + - match: {_clusters.total: 2} + - match: {_clusters.successful: 2} + - match: {_clusters.skipped: 0} - match: { _shards.total: 6 } - match: { hits.total: 12 } @@ -159,6 +176,9 @@ rest_total_hits_as_int: true index: my_remote_cluster:aliased_test_index + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} - match: { _shards.total: 3 } - match: { hits.total: 2 } - match: { hits.hits.0._source.filter_field: 1 } @@ -172,6 +192,9 @@ rest_total_hits_as_int: true index: my_remote_cluster:aliased_test_index,my_remote_cluster:field_caps_index_1 + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} - match: { _shards.total: 4 } - match: { hits.total: 2 } - match: { hits.hits.0._source.filter_field: 1 } @@ -185,6 +208,9 @@ rest_total_hits_as_int: true index: "my_remote_cluster:single_doc_index" + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} - match: { _shards.total: 1 } - match: { hits.total: 1 } - match: { hits.hits.0._index: "my_remote_cluster:single_doc_index"} diff --git a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/70_skip_shards.yml b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/70_skip_shards.yml index 81c09da54c085..d1a5a273e1d0f 100644 --- a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/70_skip_shards.yml +++ b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/70_skip_shards.yml @@ -29,10 +29,12 @@ rest_total_hits_as_int: true index: "skip_shards_index,my_remote_cluster:single_doc_index" pre_filter_shard_size: 1 + ccs_minimize_roundtrips: false body: { "size" : 10, "query" : { "range" : { "created_at" : { "gte" : "2016-02-01", "lt": "2018-02-01"} } } } - match: { hits.total: 1 } - match: { hits.hits.0._index: "skip_shards_index"} + - is_false: num_reduce_phases - match: { _shards.total: 2 } - match: { _shards.successful: 2 } - match: { _shards.skipped : 1} @@ -45,10 +47,12 @@ rest_total_hits_as_int: true index: "skip_shards_index,my_remote_cluster:single_doc_index" pre_filter_shard_size: 1 + ccs_minimize_roundtrips: false body: { "size" : 10, "query" : { "range" : { "created_at" : { "gte" : "2015-02-01", "lt": "2016-02-01"} } } } - match: { hits.total: 1 } - match: { hits.hits.0._index: "my_remote_cluster:single_doc_index"} + - is_false: num_reduce_phases - match: { _shards.total: 2 } - match: { _shards.successful: 2 } - match: { _shards.skipped : 1} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/msearch.json b/rest-api-spec/src/main/resources/rest-api-spec/api/msearch.json index 13a6005c9a189..398dcbd29515d 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/msearch.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/msearch.json @@ -43,6 +43,11 @@ "type" : "boolean", "description" : "Indicates whether hits.total should be rendered as an integer or an object in the rest search response", "default" : false + }, + "ccs_minimize_roundtrips": { + "type" : "boolean", + "description" : "Indicates whether network round-trips should be minimized as part of cross-cluster search requests execution", + "default" : "true" } } }, diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/msearch_template.json b/rest-api-spec/src/main/resources/rest-api-spec/api/msearch_template.json index e49cc2083f929..e89f96e06960f 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/msearch_template.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/msearch_template.json @@ -33,6 +33,11 @@ "type" : "boolean", "description" : "Indicates whether hits.total should be rendered as an integer or an object in the rest search response", "default" : false + }, + "ccs_minimize_roundtrips": { + "type" : "boolean", + "description" : "Indicates whether network round-trips should be minimized as part of cross-cluster search requests execution", + "default" : "true" } } }, diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/search.json b/rest-api-spec/src/main/resources/rest-api-spec/api/search.json index 9ac02b1214a2f..f44c0f74b2c3d 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/search.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/search.json @@ -24,6 +24,11 @@ "type" : "boolean", "description" : "Specify whether wildcard and prefix queries should be analyzed (default: false)" }, + "ccs_minimize_roundtrips": { + "type" : "boolean", + "description" : "Indicates whether network round-trips should be minimized as part of cross-cluster search requests execution", + "default" : "true" + }, "default_operator": { "type" : "enum", "options" : ["AND","OR"], diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/search_template.json b/rest-api-spec/src/main/resources/rest-api-spec/api/search_template.json index 528ee905e7ee8..24b7fa135b331 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/search_template.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/search_template.json @@ -67,6 +67,11 @@ "type" : "boolean", "description" : "Indicates whether hits.total should be rendered as an integer or an object in the rest search response", "default" : false + }, + "ccs_minimize_roundtrips": { + "type" : "boolean", + "description" : "Indicates whether network round-trips should be minimized as part of cross-cluster search requests execution", + "default" : "true" } } }, diff --git a/server/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java index a574c0559804b..1dc2dc624b277 100644 --- a/server/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java @@ -173,6 +173,7 @@ public static void readMultiLineFormat(BytesReference data, String[] types, String routing, String searchType, + Boolean ccsMinimizeRoundtrips, NamedXContentRegistry registry, boolean allowExplicitIndex) throws IOException { int from = 0; @@ -205,6 +206,9 @@ public static void readMultiLineFormat(BytesReference data, if (searchType != null) { searchRequest.searchType(searchType); } + if (ccsMinimizeRoundtrips != null) { + searchRequest.setCcsMinimizeRoundtrips(ccsMinimizeRoundtrips); + } IndicesOptions defaultOptions = searchRequest.indicesOptions(); // now parse the action if (nextMarker - from > 0) { @@ -226,6 +230,8 @@ public static void readMultiLineFormat(BytesReference data, searchRequest.types(nodeStringArrayValue(value)); } else if ("search_type".equals(entry.getKey()) || "searchType".equals(entry.getKey())) { searchRequest.searchType(nodeStringValue(value, null)); + } else if ("ccs_minimize_roundtrips".equals(entry.getKey()) || "ccsMinimizeRoundtrips".equals(entry.getKey())) { + searchRequest.setCcsMinimizeRoundtrips(nodeBooleanValue(value)); } else if ("request_cache".equals(entry.getKey()) || "requestCache".equals(entry.getKey())) { searchRequest.requestCache(nodeBooleanValue(value, entry.getKey())); } else if ("preference".equals(entry.getKey())) { @@ -327,6 +333,7 @@ public static void writeSearchRequestParams(SearchRequest request, XContentBuild if (request.searchType() != null) { xContentBuilder.field("search_type", request.searchType().name().toLowerCase(Locale.ROOT)); } + xContentBuilder.field("ccs_minimize_roundtrips", request.isCcsMinimizeRoundtrips()); if (request.requestCache() != null) { xContentBuilder.field("request_cache", request.requestCache()); } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 020887068f015..55122b6806fd2 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -93,6 +93,8 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest private String[] types = Strings.EMPTY_ARRAY; + private boolean ccsMinimizeRoundtrips = true; + public static final IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.strictExpandOpenAndForbidClosedIgnoreThrottled(); private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS; @@ -106,21 +108,7 @@ public SearchRequest() { * Constructs a new search request from the provided search request */ public SearchRequest(SearchRequest searchRequest) { - this.allowPartialSearchResults = searchRequest.allowPartialSearchResults; - this.batchedReduceSize = searchRequest.batchedReduceSize; - this.indices = searchRequest.indices; - this.indicesOptions = searchRequest.indicesOptions; - this.maxConcurrentShardRequests = searchRequest.maxConcurrentShardRequests; - this.preference = searchRequest.preference; - this.preFilterShardSize = searchRequest.preFilterShardSize; - this.requestCache = searchRequest.requestCache; - this.routing = searchRequest.routing; - this.scroll = searchRequest.scroll; - this.searchType = searchRequest.searchType; - this.source = searchRequest.source; - this.types = searchRequest.types; - this.localClusterAlias = searchRequest.localClusterAlias; - this.absoluteStartMillis = searchRequest.absoluteStartMillis; + this(searchRequest, searchRequest.indices, searchRequest.localClusterAlias, searchRequest.absoluteStartMillis); } /** @@ -144,16 +132,40 @@ public SearchRequest(String[] indices, SearchSourceBuilder source) { } /** - * Creates a new search request by providing the alias of the cluster where it will be executed, as well as the current time in - * milliseconds from the epoch time. Used when a {@link SearchRequest} is created and executed as part of a cross-cluster search - * request performing local reduction on each cluster. The coordinating CCS node provides the alias to prefix index names with in - * the returned search results, and the current time to be used on the remote clusters to ensure that the same value is used. + * Creates a new search request by providing the search request to copy all fields from, the indices to search against, + * the alias of the cluster where it will be executed, as well as the start time in milliseconds from the epoch time. + * Used when a {@link SearchRequest} is created and executed as part of a cross-cluster search request performing local reduction + * on each cluster. The coordinating CCS node provides the original search request, the indices to search against as well as the + * alias to prefix index names with in the returned search results, and the absolute start time to be used on the remote clusters + * to ensure that the same value is used. */ - SearchRequest(String localClusterAlias, long absoluteStartMillis) { - this.localClusterAlias = Objects.requireNonNull(localClusterAlias, "cluster alias must not be null"); + static SearchRequest withLocalReduction(SearchRequest originalSearchRequest, String[] indices, + String localClusterAlias, long absoluteStartMillis) { + Objects.requireNonNull(originalSearchRequest, "search request must not be null"); + validateIndices(indices); + Objects.requireNonNull(localClusterAlias, "cluster alias must not be null"); if (absoluteStartMillis < 0) { throw new IllegalArgumentException("absoluteStartMillis must not be negative but was [" + absoluteStartMillis + "]"); } + return new SearchRequest(originalSearchRequest, indices, localClusterAlias, absoluteStartMillis); + } + + private SearchRequest(SearchRequest searchRequest, String[] indices, String localClusterAlias, long absoluteStartMillis) { + this.allowPartialSearchResults = searchRequest.allowPartialSearchResults; + this.batchedReduceSize = searchRequest.batchedReduceSize; + this.ccsMinimizeRoundtrips = searchRequest.ccsMinimizeRoundtrips; + this.indices = indices; + this.indicesOptions = searchRequest.indicesOptions; + this.maxConcurrentShardRequests = searchRequest.maxConcurrentShardRequests; + this.preference = searchRequest.preference; + this.preFilterShardSize = searchRequest.preFilterShardSize; + this.requestCache = searchRequest.requestCache; + this.routing = searchRequest.routing; + this.scroll = searchRequest.scroll; + this.searchType = searchRequest.searchType; + this.source = searchRequest.source; + this.types = searchRequest.types; + this.localClusterAlias = localClusterAlias; this.absoluteStartMillis = absoluteStartMillis; } @@ -191,6 +203,9 @@ public SearchRequest(StreamInput in) throws IOException { localClusterAlias = null; absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS; } + if (in.getVersion().onOrAfter(Version.V_7_0_0)) { + ccsMinimizeRoundtrips = in.readBoolean(); + } } @Override @@ -217,33 +232,37 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(absoluteStartMillis); } } + if (out.getVersion().onOrAfter(Version.V_7_0_0)) { + out.writeBoolean(ccsMinimizeRoundtrips); + } } @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; - final Scroll scroll = scroll(); - if (source != null - && source.trackTotalHitsUpTo() != null - && source.trackTotalHitsUpTo() != SearchContext.TRACK_TOTAL_HITS_ACCURATE - && scroll != null) { - validationException = - addValidationError("disabling [track_total_hits] is not allowed in a scroll context", validationException); - } - if (source != null && source.from() > 0 && scroll != null) { - validationException = - addValidationError("using [from] is not allowed in a scroll context", validationException); - } - if (requestCache != null && requestCache && scroll != null) { - validationException = - addValidationError("[request_cache] cannot be used in a scroll context", validationException); - } - if (source != null && source.size() == 0 && scroll != null) { - validationException = addValidationError("[size] cannot be [0] in a scroll context", validationException); - } - if (source != null && source.rescores() != null && source.rescores().isEmpty() == false && scroll != null) { - validationException = - addValidationError("using [rescore] is not allowed in a scroll context", validationException); + boolean scroll = scroll() != null; + if (scroll) { + if (source != null) { + if (source.trackTotalHitsUpTo() != null && source.trackTotalHitsUpTo() != SearchContext.TRACK_TOTAL_HITS_ACCURATE) { + validationException = + addValidationError("disabling [track_total_hits] is not allowed in a scroll context", validationException); + } + if (source.from() > 0) { + validationException = + addValidationError("using [from] is not allowed in a scroll context", validationException); + } + if (source.size() == 0) { + validationException = addValidationError("[size] cannot be [0] in a scroll context", validationException); + } + if (source.rescores() != null && source.rescores().isEmpty() == false) { + validationException = + addValidationError("using [rescore] is not allowed in a scroll context", validationException); + } + } + if (requestCache != null && requestCache) { + validationException = + addValidationError("[request_cache] cannot be used in a scroll context", validationException); + } } return validationException; } @@ -261,8 +280,8 @@ String getLocalClusterAlias() { /** * Returns the current time in milliseconds from the time epoch, to be used for the execution of this search request. Used to * ensure that the same value, determined by the coordinating node, is used on all nodes involved in the execution of the search - * request. When created through {@link #SearchRequest(String, long)}, this method returns the provided current time, otherwise - * it will return {@link System#currentTimeMillis()}. + * request. When created through {@link #withLocalReduction(SearchRequest, String[], String, long)}, this method returns the provided + * current time, otherwise it will return {@link System#currentTimeMillis()}. * */ long getOrCreateAbsoluteStartMillis() { @@ -274,12 +293,16 @@ long getOrCreateAbsoluteStartMillis() { */ @Override public SearchRequest indices(String... indices) { + validateIndices(indices); + this.indices = indices; + return this; + } + + private static void validateIndices(String... indices) { Objects.requireNonNull(indices, "indices must not be null"); for (String index : indices) { Objects.requireNonNull(index, "index must not be null"); } - this.indices = indices; - return this; } @Override @@ -292,6 +315,21 @@ public SearchRequest indicesOptions(IndicesOptions indicesOptions) { return this; } + /** + * Returns whether network round-trips should be minimized when executing cross-cluster search requests. + * Defaults to true. + */ + public boolean isCcsMinimizeRoundtrips() { + return ccsMinimizeRoundtrips; + } + + /** + * Sets whether network round-trips should be minimized when executing cross-cluster search requests. Defaults to true. + */ + public void setCcsMinimizeRoundtrips(boolean ccsMinimizeRoundtrips) { + this.ccsMinimizeRoundtrips = ccsMinimizeRoundtrips; + } + /** * The document types to execute the search against. Defaults to be executed against * all types. @@ -583,14 +621,15 @@ public boolean equals(Object o) { Objects.equals(indicesOptions, that.indicesOptions) && Objects.equals(allowPartialSearchResults, that.allowPartialSearchResults) && Objects.equals(localClusterAlias, that.localClusterAlias) && - absoluteStartMillis == that.absoluteStartMillis; + absoluteStartMillis == that.absoluteStartMillis && + ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips; } @Override public int hashCode() { return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache, scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize, - allowPartialSearchResults, localClusterAlias, absoluteStartMillis); + allowPartialSearchResults, localClusterAlias, absoluteStartMillis, ccsMinimizeRoundtrips); } @Override @@ -610,6 +649,7 @@ public String toString() { ", allowPartialSearchResults=" + allowPartialSearchResults + ", localClusterAlias=" + localClusterAlias + ", getOrCreateAbsoluteStartMillis=" + absoluteStartMillis + + ", ccsMinimizeRoundtrips=" + ccsMinimizeRoundtrips + ", source=" + source + '}'; } } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java b/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java index 0273d5e58219a..dd0d4de07d6f4 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java @@ -111,7 +111,6 @@ public Aggregations getAggregations() { return internalResponse.aggregations(); } - public Suggest getSuggest() { return internalResponse.suggest(); } @@ -349,7 +348,7 @@ static SearchResponse innerFromXContent(XContentParser parser) throws IOExceptio SearchResponseSections searchResponseSections = new SearchResponseSections(hits, aggs, suggest, timedOut, terminatedEarly, profile, numReducePhases); return new SearchResponse(searchResponseSections, scrollId, totalShards, successfulShards, skippedShards, tookInMillis, - failures.toArray(new ShardSearchFailure[failures.size()]), clusters); + failures.toArray(ShardSearchFailure.EMPTY_ARRAY), clusters); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java b/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java index b146d42c0d2e6..567040246c50f 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java @@ -39,6 +39,7 @@ import org.elasticsearch.search.profile.ProfileShardResult; import org.elasticsearch.search.profile.SearchProfileShardResults; import org.elasticsearch.search.suggest.Suggest; +import org.elasticsearch.transport.RemoteClusterAware; import java.util.ArrayList; import java.util.Arrays; @@ -76,9 +77,9 @@ //from the remote clusters in the fetch phase. This would be identical to the removed QueryAndFetch strategy except that only the remote //cluster response would have the fetch results. final class SearchResponseMerger { - private final int from; - private final int size; - private final int trackTotalHitsUpTo; + final int from; + final int size; + final int trackTotalHitsUpTo; private final SearchTimeProvider searchTimeProvider; private final Function reduceContextFunction; private final List searchResponses = new CopyOnWriteArrayList<>(); @@ -98,15 +99,28 @@ final class SearchResponseMerger { * That may change in the future as it's possible to introduce incremental merges as responses come in if necessary. */ void add(SearchResponse searchResponse) { + assert searchResponse.getScrollId() == null : "merging scroll results is not supported"; searchResponses.add(searchResponse); } + int numResponses() { + return searchResponses.size(); + } + /** * Returns the merged response. To be called once all responses have been added through {@link #add(SearchResponse)} * so that all responses are merged into a single one. */ SearchResponse getMergedResponse(Clusters clusters) { - assert searchResponses.size() > 1; + //if the search is only across remote clusters, none of them are available, and all of them have skip_unavailable set to true, + //we end up calling merge without anything to merge, we just return an empty search response + if (searchResponses.size() == 0) { + SearchHits searchHits = new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), Float.NaN); + InternalSearchResponse internalSearchResponse = new InternalSearchResponse(searchHits, + InternalAggregations.EMPTY, null, null, false, null, 0); + return new SearchResponse(internalSearchResponse, null, 0, 0, 0, searchTimeProvider.buildTookInMillis(), + ShardSearchFailure.EMPTY_ARRAY, clusters); + } int totalShards = 0; int skippedShards = 0; int successfulShards = 0; @@ -115,7 +129,7 @@ SearchResponse getMergedResponse(Clusters clusters) { List failures = new ArrayList<>(); Map profileResults = new HashMap<>(); List aggs = new ArrayList<>(); - Map shards = new TreeMap<>(); + Map shards = new TreeMap<>(); List topDocsList = new ArrayList<>(searchResponses.size()); Map> groupedSuggestions = new HashMap<>(); Boolean trackTotalHits = null; @@ -171,10 +185,11 @@ SearchResponse getMergedResponse(Clusters clusters) { Suggest suggest = groupedSuggestions.isEmpty() ? null : new Suggest(Suggest.reduce(groupedSuggestions)); InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, reduceContextFunction.apply(true)); ShardSearchFailure[] shardFailures = failures.toArray(ShardSearchFailure.EMPTY_ARRAY); + SearchProfileShardResults profileShardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults); //make failures ordering consistent with ordinary search and CCS Arrays.sort(shardFailures, FAILURES_COMPARATOR); - InternalSearchResponse response = new InternalSearchResponse(mergedSearchHits, reducedAggs, suggest, - new SearchProfileShardResults(profileResults), topDocsStats.timedOut, topDocsStats.terminatedEarly, numReducePhases); + InternalSearchResponse response = new InternalSearchResponse(mergedSearchHits, reducedAggs, suggest, profileShardResults, + topDocsStats.timedOut, topDocsStats.terminatedEarly, numReducePhases); long tookInMillis = searchTimeProvider.buildTookInMillis(); return new SearchResponse(response, null, totalShards, successfulShards, skippedShards, tookInMillis, shardFailures, clusters); } @@ -210,7 +225,7 @@ private ShardId extractShardId(ShardSearchFailure failure) { } }; - private static TopDocs searchHitsToTopDocs(SearchHits searchHits, TotalHits totalHits, Map shards) { + private static TopDocs searchHitsToTopDocs(SearchHits searchHits, TotalHits totalHits, Map shards) { SearchHit[] hits = searchHits.getHits(); ScoreDoc[] scoreDocs = new ScoreDoc[hits.length]; final TopDocs topDocs; @@ -228,7 +243,8 @@ private static TopDocs searchHitsToTopDocs(SearchHits searchHits, TotalHits tota for (int i = 0; i < hits.length; i++) { SearchHit hit = hits[i]; - ShardId shardId = hit.getShard().getShardId(); + SearchShardTarget shard = hit.getShard(); + ShardIdAndClusterAlias shardId = new ShardIdAndClusterAlias(shard.getShardId(), shard.getClusterAlias()); shards.putIfAbsent(shardId, null); final SortField[] sortFields = searchHits.getSortFields(); final Object[] sortValues; @@ -246,18 +262,21 @@ private static TopDocs searchHitsToTopDocs(SearchHits searchHits, TotalHits tota return topDocs; } - private static void setShardIndex(Map shards, List topDocsList) { - int shardIndex = 0; - for (Map.Entry shard : shards.entrySet()) { - shard.setValue(shardIndex++); + private static void setShardIndex(Map shards, List topDocsList) { + { + //assign a different shardIndex to each shard, based on their shardId natural ordering and their cluster alias + int shardIndex = 0; + for (Map.Entry shard : shards.entrySet()) { + shard.setValue(shardIndex++); + } } - //and go through all the scoreDocs from each cluster and set their corresponding shardIndex + //go through all the scoreDocs from each cluster and set their corresponding shardIndex for (TopDocs topDocs : topDocsList) { for (ScoreDoc scoreDoc : topDocs.scoreDocs) { FieldDocAndSearchHit fieldDocAndSearchHit = (FieldDocAndSearchHit) scoreDoc; - //When hits come from the indices with same names on multiple clusters and same shard identifier, we rely on such indices - //to have a different uuid across multiple clusters. That's how they will get a different shardIndex. - ShardId shardId = fieldDocAndSearchHit.searchHit.getShard().getShardId(); + SearchShardTarget shard = fieldDocAndSearchHit.searchHit.getShard(); + ShardIdAndClusterAlias shardId = new ShardIdAndClusterAlias(shard.getShardId(), shard.getClusterAlias()); + assert shards.containsKey(shardId); fieldDocAndSearchHit.shardIndex = shards.get(shardId); } } @@ -294,4 +313,58 @@ private static final class FieldDocAndSearchHit extends FieldDoc { this.searchHit = searchHit; } } + + /** + * This class is used instead of plain {@link ShardId} to support the scenario where the same remote cluster is registered twice using + * different aliases. In that case searching across the same cluster twice would make an assertion in lucene fail + * (see TopDocs#tieBreakLessThan line 86). Generally, indices with same names on different clusters have different index uuids which + * make their ShardIds different, which is not the case if the index is really the same one from the same cluster, in which case we + * need to look at the cluster alias and make sure to assign a different shardIndex based on that. + */ + private static final class ShardIdAndClusterAlias implements Comparable { + private final ShardId shardId; + private final String clusterAlias; + + ShardIdAndClusterAlias(ShardId shardId, String clusterAlias) { + this.shardId = shardId; + this.clusterAlias = clusterAlias; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ShardIdAndClusterAlias that = (ShardIdAndClusterAlias) o; + return shardId.equals(that.shardId) && + clusterAlias.equals(that.clusterAlias); + } + + @Override + public int hashCode() { + return Objects.hash(shardId, clusterAlias); + } + + @Override + public int compareTo(ShardIdAndClusterAlias o) { + int shardIdCompareTo = shardId.compareTo(o.shardId); + if (shardIdCompareTo != 0) { + return shardIdCompareTo; + } + int clusterAliasCompareTo = clusterAlias.compareTo(o.clusterAlias); + if (clusterAliasCompareTo != 0) { + //TODO we may want to fix this, CCS returns remote results before local ones (TransportSearchAction#mergeShardsIterators) + if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { + return 1; + } + if (o.clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { + return -1; + } + } + return clusterAliasCompareTo; + } + } } diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 30e030eca7376..48ae3f1249522 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -47,8 +47,10 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchService; +import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.AliasFilter; +import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; @@ -69,6 +71,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.LongSupplier; @@ -190,8 +193,8 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener< new SearchTimeProvider(searchRequest.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime); ActionListener rewriteListener = ActionListener.wrap(source -> { if (source != searchRequest.source()) { - // only set it if it changed - we don't allow null values to be set but it might be already null be we want to catch - // situations when it possible due to a bug changes to null + // only set it if it changed - we don't allow null values to be set but it might be already null. this way we catch + // situations when source is rewritten to null due to a bug searchRequest.source(source); } final ClusterState clusterState = clusterService.state(); @@ -199,26 +202,31 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener< searchRequest.indices(), idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState)); OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); if (remoteClusterIndices.isEmpty()) { - executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, Collections.emptyList(), - (clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener, SearchResponse.Clusters.EMPTY); + executeLocalSearch(task, timeProvider, searchRequest, localIndices, clusterState, listener); } else { - AtomicInteger skippedClusters = new AtomicInteger(0); - collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(), skippedClusters, - remoteClusterIndices, remoteClusterService, threadPool, - ActionListener.wrap( - searchShardsResponses -> { - List remoteShardIterators = new ArrayList<>(); - Map remoteAliasFilters = new HashMap<>(); - BiFunction clusterNodeLookup = processRemoteShards( - searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters); - int localClusters = localIndices == null ? 0 : 1; - int totalClusters = remoteClusterIndices.size() + localClusters; - int successfulClusters = searchShardsResponses.size() + localClusters; - executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, - remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener, - new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get())); - }, - listener::onFailure)); + if (shouldMinimizeRoundtrips(searchRequest)) { + ccsRemoteReduce(searchRequest, localIndices, remoteClusterIndices, timeProvider, searchService::createReduceContext, + remoteClusterService, threadPool, listener, + (r, l) -> executeLocalSearch(task, timeProvider, r, localIndices, clusterState, l)); + } else { + AtomicInteger skippedClusters = new AtomicInteger(0); + collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(), + skippedClusters, remoteClusterIndices, remoteClusterService, threadPool, + ActionListener.wrap( + searchShardsResponses -> { + List remoteShardIterators = new ArrayList<>(); + Map remoteAliasFilters = new HashMap<>(); + BiFunction clusterNodeLookup = processRemoteShards( + searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters); + int localClusters = localIndices == null ? 0 : 1; + int totalClusters = remoteClusterIndices.size() + localClusters; + int successfulClusters = searchShardsResponses.size() + localClusters; + executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, + remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener, + new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get())); + }, + listener::onFailure)); + } } }, listener::onFailure); if (searchRequest.source() == null) { @@ -229,12 +237,79 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener< } } + static boolean shouldMinimizeRoundtrips(SearchRequest searchRequest) { + if (searchRequest.isCcsMinimizeRoundtrips() == false) { + return false; + } + if (searchRequest.scroll() != null) { + return false; + } + SearchSourceBuilder source = searchRequest.source(); + return source == null || source.collapse() == null || source.collapse().getInnerHits() == null || + source.collapse().getInnerHits().isEmpty(); + } + + static void ccsRemoteReduce(SearchRequest searchRequest, OriginalIndices localIndices, Map remoteIndices, + SearchTimeProvider timeProvider, Function reduceContext, + RemoteClusterService remoteClusterService, ThreadPool threadPool, ActionListener listener, + BiConsumer> localSearchConsumer) { + SearchResponseMerger searchResponseMerger = createSearchResponseMerger(searchRequest.source(), timeProvider, reduceContext); + AtomicInteger skippedClusters = new AtomicInteger(0); + final AtomicReference exceptions = new AtomicReference<>(); + int totalClusters = remoteIndices.size() + (localIndices == null ? 0 : 1); + final CountDown countDown = new CountDown(totalClusters); + for (Map.Entry entry : remoteIndices.entrySet()) { + String clusterAlias = entry.getKey(); + boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias); + OriginalIndices indices = entry.getValue(); + SearchRequest ccsSearchRequest = SearchRequest.withLocalReduction(searchRequest, indices.indices(), + clusterAlias, timeProvider.getAbsoluteStartMillis()); + ActionListener ccsListener = createCCSListener(clusterAlias, skipUnavailable, countDown, + skippedClusters, exceptions, searchResponseMerger, totalClusters, listener); + Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias); + remoteClusterClient.search(ccsSearchRequest, ccsListener); + } + if (localIndices != null) { + ActionListener ccsListener = createCCSListener(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, + false, countDown, skippedClusters, exceptions, searchResponseMerger, totalClusters, listener); + //here we provide the empty string a cluster alias, which means no prefix in index name, + //but the coord node will perform non final reduce as it's not null. + SearchRequest ccsLocalSearchRequest = SearchRequest.withLocalReduction(searchRequest, localIndices.indices(), + RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis()); + localSearchConsumer.accept(ccsLocalSearchRequest, ccsListener); + } + } + + static SearchResponseMerger createSearchResponseMerger(SearchSourceBuilder source, SearchTimeProvider timeProvider, + Function reduceContextFunction) { + final int from; + final int size; + final int trackTotalHitsUpTo; + if (source == null) { + from = SearchService.DEFAULT_FROM; + size = SearchService.DEFAULT_SIZE; + trackTotalHitsUpTo = SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO; + } else { + from = source.from() == -1 ? SearchService.DEFAULT_FROM : source.from(); + size = source.size() == -1 ? SearchService.DEFAULT_SIZE : source.size(); + trackTotalHitsUpTo = source.trackTotalHitsUpTo() == null + ? SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO : source.trackTotalHitsUpTo(); + //here we modify the original source so we can re-use it by setting it to each outgoing search request + source.from(0); + source.size(from + size); + //TODO when searching only against a remote cluster, we could ask directly for the final number of results and let + //the remote cluster do a final reduction, yet that is not possible as we are providing a localClusterAlias which + //will automatically make the reduction non final + } + return new SearchResponseMerger(from, size, trackTotalHitsUpTo, timeProvider, reduceContextFunction); + } + static void collectSearchShards(IndicesOptions indicesOptions, String preference, String routing, AtomicInteger skippedClusters, Map remoteIndicesByCluster, RemoteClusterService remoteClusterService, ThreadPool threadPool, ActionListener> listener) { final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size()); final Map searchShardsResponses = new ConcurrentHashMap<>(); - final AtomicReference transportException = new AtomicReference<>(); + final AtomicReference exceptions = new AtomicReference<>(); for (Map.Entry entry : remoteIndicesByCluster.entrySet()) { final String clusterAlias = entry.getKey(); boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias); @@ -242,49 +317,53 @@ static void collectSearchShards(IndicesOptions indicesOptions, String preference final String[] indices = entry.getValue().indices(); ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices) .indicesOptions(indicesOptions).local(true).preference(preference).routing(routing); - clusterClient.admin().cluster().searchShards(searchShardsRequest, new ActionListener() { + clusterClient.admin().cluster().searchShards(searchShardsRequest, + new CCSActionListener>( + clusterAlias, skipUnavailable, responsesCountDown, skippedClusters, exceptions, listener) { @Override - public void onResponse(ClusterSearchShardsResponse response) { - searchShardsResponses.put(clusterAlias, response); - maybeFinish(); + void innerOnResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) { + searchShardsResponses.put(clusterAlias, clusterSearchShardsResponse); } @Override - public void onFailure(Exception e) { - if (skipUnavailable) { - skippedClusters.incrementAndGet(); - } else { - RemoteTransportException exception = - new RemoteTransportException("error while communicating with remote cluster [" + clusterAlias + "]", e); - if (transportException.compareAndSet(null, exception) == false) { - transportException.accumulateAndGet(exception, (previous, current) -> { - current.addSuppressed(previous); - return current; - }); - } - } - maybeFinish(); - } - - private void maybeFinish() { - if (responsesCountDown.countDown()) { - RemoteTransportException exception = transportException.get(); - if (exception == null) { - listener.onResponse(searchShardsResponses); - } else { - listener.onFailure(transportException.get()); - } - } + Map createFinalResponse() { + return searchShardsResponses; } } ); } } + private static ActionListener createCCSListener(String clusterAlias, boolean skipUnavailable, CountDown countDown, + AtomicInteger skippedClusters, AtomicReference exceptions, + SearchResponseMerger searchResponseMerger, int totalClusters, + ActionListener originalListener) { + return new CCSActionListener(clusterAlias, skipUnavailable, countDown, skippedClusters, + exceptions, originalListener) { + @Override + void innerOnResponse(SearchResponse searchResponse) { + searchResponseMerger.add(searchResponse); + } + + @Override + SearchResponse createFinalResponse() { + SearchResponse.Clusters clusters = new SearchResponse.Clusters(totalClusters, searchResponseMerger.numResponses(), + skippedClusters.get()); + return searchResponseMerger.getMergedResponse(clusters); + } + }; + } + + private void executeLocalSearch(Task task, SearchTimeProvider timeProvider, SearchRequest searchRequest, OriginalIndices localIndices, + ClusterState clusterState, ActionListener listener) { + executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, Collections.emptyList(), + (clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener, SearchResponse.Clusters.EMPTY); + } + static BiFunction processRemoteShards(Map searchShardsResponses, - Map remoteIndicesByCluster, - List remoteShardIterators, - Map aliasFilterMap) { + Map remoteIndicesByCluster, + List remoteShardIterators, + Map aliasFilterMap) { Map> clusterToNode = new HashMap<>(); for (Map.Entry entry : searchShardsResponses.entrySet()) { String clusterAlias = entry.getKey(); @@ -491,4 +570,70 @@ private static void failIfOverShardCountLimit(ClusterService clusterService, int + "] to a greater value if you really want to query that many shards at the same time."); } } + + abstract static class CCSActionListener implements ActionListener { + private final String clusterAlias; + private final boolean skipUnavailable; + private final CountDown countDown; + private final AtomicInteger skippedClusters; + private final AtomicReference exceptions; + private final ActionListener originalListener; + + CCSActionListener(String clusterAlias, boolean skipUnavailable, CountDown countDown, AtomicInteger skippedClusters, + AtomicReference exceptions, ActionListener originalListener) { + this.clusterAlias = clusterAlias; + this.skipUnavailable = skipUnavailable; + this.countDown = countDown; + this.skippedClusters = skippedClusters; + this.exceptions = exceptions; + this.originalListener = originalListener; + } + + @Override + public final void onResponse(Response response) { + innerOnResponse(response); + maybeFinish(); + } + + abstract void innerOnResponse(Response response); + + @Override + public final void onFailure(Exception e) { + if (skipUnavailable) { + skippedClusters.incrementAndGet(); + } else { + Exception exception = e; + if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias) == false) { + exception = new RemoteTransportException("error while communicating with remote cluster [" + clusterAlias + "]", e); + } + if (exceptions.compareAndSet(null, exception) == false) { + exceptions.accumulateAndGet(exception, (previous, current) -> { + current.addSuppressed(previous); + return current; + }); + } + } + maybeFinish(); + } + + private void maybeFinish() { + if (countDown.countDown()) { + Exception exception = exceptions.get(); + if (exception == null) { + FinalResponse response; + try { + response = createFinalResponse(); + } catch(Exception e) { + originalListener.onFailure(e); + return; + } + originalListener.onResponse(response); + } else { + originalListener.onFailure(exceptions.get()); + } + } + } + + abstract FinalResponse createFinalResponse(); + } } diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java b/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java index dde71ad68e17f..2de583b460f84 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java @@ -1009,7 +1009,7 @@ public List readNamedWriteableList(Class catego } /** - * Reads an enum with type E that was serialized based on the value of it's ordinal + * Reads an enum with type E that was serialized based on the value of its ordinal */ public > E readEnum(Class enumClass) throws IOException { int ordinal = readVInt(); diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java b/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java index 3031e2f2e7164..175f800a7d8cf 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java @@ -1094,7 +1094,7 @@ public void writeNamedWriteableList(List list) throws } /** - * Writes an enum with type E that by serialized it based on it's ordinal value + * Writes an enum with type E based on its ordinal value */ public > void writeEnum(E enumValue) throws IOException { writeVInt(enumValue.ordinal()); diff --git a/server/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java b/server/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java index 49bebe053a3f9..05a20a0cc06b9 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java @@ -147,13 +147,14 @@ public static void parseMultiLineRequest(RestRequest request, IndicesOptions ind String[] indices = Strings.splitStringByCommaToArray(request.param("index")); String[] types = Strings.splitStringByCommaToArray(request.param("type")); String searchType = request.param("search_type"); + boolean ccsMinimizeRoundtrips = request.paramAsBoolean("ccs_minimize_roundtrips", true); String routing = request.param("routing"); final Tuple sourceTuple = request.contentOrSourceParam(); final XContent xContent = sourceTuple.v1().xContent(); final BytesReference data = sourceTuple.v2(); MultiSearchRequest.readMultiLineFormat(data, xContent, consumer, indices, indicesOptions, types, routing, - searchType, request.getXContentRegistry(), allowExplicitIndex); + searchType, ccsMinimizeRoundtrips, request.getXContentRegistry(), allowExplicitIndex); } @Override diff --git a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java index 78082dd364173..00c08a124f1e4 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java @@ -173,6 +173,7 @@ public static void parseSearchRequest(SearchRequest searchRequest, RestRequest r searchRequest.routing(request.param("routing")); searchRequest.preference(request.param("preference")); searchRequest.indicesOptions(IndicesOptions.fromRequest(request, searchRequest.indicesOptions())); + searchRequest.setCcsMinimizeRoundtrips(request.paramAsBoolean("ccs_minimize_roundtrips", true)); checkRestTotalHits(request, searchRequest); } diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index ef255c8af7ad1..a14b4a328775c 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -148,6 +148,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv public static final Setting MAX_OPEN_SCROLL_CONTEXT = Setting.intSetting("search.max_open_scroll_context", 500, 0, Property.Dynamic, Property.NodeScope); + public static final int DEFAULT_SIZE = 10; + public static final int DEFAULT_FROM = 0; private final ThreadPool threadPool; @@ -606,10 +608,10 @@ final SearchContext createContext(ShardSearchRequest request) throws IOException // if the from and size are still not set, default them if (context.from() == -1) { - context.from(0); + context.from(DEFAULT_FROM); } if (context.size() == -1) { - context.size(10); + context.size(DEFAULT_SIZE); } // pre process diff --git a/server/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java b/server/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java index bd12f46564bac..da22ce4c96c1a 100644 --- a/server/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java @@ -180,7 +180,7 @@ public void testSimpleAdd4() throws Exception { assertThat(request.requests().get(2).routing(), equalTo("123")); } - public void testResponseErrorToXContent() throws IOException { + public void testResponseErrorToXContent() { long tookInMillis = randomIntBetween(1, 1000); MultiSearchResponse response = new MultiSearchResponse( new MultiSearchResponse.Item[] { @@ -262,12 +262,12 @@ public void testMultiLineSerialization() throws IOException { parsedRequest.add(r); }; MultiSearchRequest.readMultiLineFormat(new BytesArray(originalBytes), xContentType.xContent(), - consumer, null, null, null, null, null, xContentRegistry(), true); + consumer, null, null, null, null, null, null, xContentRegistry(), true); assertEquals(originalRequest, parsedRequest); } } - public void testEqualsAndHashcode() throws IOException { + public void testEqualsAndHashcode() { checkEqualsAndHashCode(createMultiSearchRequest(), MultiSearchRequestTests::copyRequest, MultiSearchRequestTests::mutate); } @@ -282,7 +282,7 @@ private static MultiSearchRequest mutate(MultiSearchRequest searchRequest) throw return mutation; } - private static MultiSearchRequest copyRequest(MultiSearchRequest request) throws IOException { + private static MultiSearchRequest copyRequest(MultiSearchRequest request) { MultiSearchRequest copy = new MultiSearchRequest(); if (request.maxConcurrentSearchRequests() > 0) { copy.maxConcurrentSearchRequests(request.maxConcurrentSearchRequests()); @@ -294,7 +294,7 @@ private static MultiSearchRequest copyRequest(MultiSearchRequest request) throws return copy; } - private static MultiSearchRequest createMultiSearchRequest() throws IOException { + private static MultiSearchRequest createMultiSearchRequest() { int numSearchRequest = randomIntBetween(1, 128); MultiSearchRequest request = new MultiSearchRequest(); for (int j = 0; j < numSearchRequest; j++) { @@ -321,7 +321,7 @@ private static MultiSearchRequest createMultiSearchRequest() throws IOException return request; } - private static SearchRequest createSimpleSearchRequest() throws IOException { + private static SearchRequest createSimpleSearchRequest() { return randomSearchRequest(() -> { // No need to return a very complex SearchSourceBuilder here, that is tested elsewhere SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); diff --git a/server/src/test/java/org/elasticsearch/action/search/MultiSearchResponseTests.java b/server/src/test/java/org/elasticsearch/action/search/MultiSearchResponseTests.java index 4bd4406d81cca..d91a4eaf02288 100644 --- a/server/src/test/java/org/elasticsearch/action/search/MultiSearchResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/MultiSearchResponseTests.java @@ -46,8 +46,8 @@ protected MultiSearchResponse createTestInstance() { int totalShards = randomIntBetween(1, Integer.MAX_VALUE); int successfulShards = randomIntBetween(0, totalShards); int skippedShards = totalShards - successfulShards; + SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty(); - SearchResponse.Clusters clusters = new SearchResponse.Clusters(totalShards, successfulShards, skippedShards); SearchResponse searchResponse = new SearchResponse(internalSearchResponse, null, totalShards, successfulShards, skippedShards, tookInMillis, ShardSearchFailure.EMPTY_ARRAY, clusters); items[i] = new MultiSearchResponse.Item(searchResponse, null); @@ -60,14 +60,13 @@ private static MultiSearchResponse createTestInstanceWithFailures() { MultiSearchResponse.Item[] items = new MultiSearchResponse.Item[numItems]; for (int i = 0; i < numItems; i++) { if (randomBoolean()) { - // Creating a minimal response is OK, because SearchResponse self - // is tested elsewhere. + // Creating a minimal response is OK, because SearchResponse is tested elsewhere. long tookInMillis = randomNonNegativeLong(); int totalShards = randomIntBetween(1, Integer.MAX_VALUE); int successfulShards = randomIntBetween(0, totalShards); int skippedShards = totalShards - successfulShards; + SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty(); - SearchResponse.Clusters clusters = new SearchResponse.Clusters(totalShards, successfulShards, skippedShards); SearchResponse searchResponse = new SearchResponse(internalSearchResponse, null, totalShards, successfulShards, skippedShards, tookInMillis, ShardSearchFailure.EMPTY_ARRAY, clusters); items[i] = new MultiSearchResponse.Item(searchResponse, null); diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java index 91f6c0c09cd20..1d2d59c60e2ae 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java @@ -22,12 +22,12 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.ArrayUtils; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.AbstractSearchTestCase; -import org.elasticsearch.search.RandomSearchRequestGenerator; import org.elasticsearch.search.Scroll; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.rescore.QueryRescorerBuilder; @@ -48,19 +48,23 @@ public class SearchRequestTests extends AbstractSearchTestCase { @Override protected SearchRequest createSearchRequest() throws IOException { + SearchRequest request = super.createSearchRequest(); if (randomBoolean()) { - return super.createSearchRequest(); + return request; } //clusterAlias and absoluteStartMillis do not have public getters/setters hence we randomize them only in this test specifically. - SearchRequest searchRequest = new SearchRequest(randomAlphaOfLengthBetween(5, 10), randomNonNegativeLong()); - RandomSearchRequestGenerator.randomSearchRequest(searchRequest, this::createSearchSourceBuilder); - return searchRequest; + return SearchRequest.withLocalReduction(request, request.indices(), + randomAlphaOfLengthBetween(5, 10), randomNonNegativeLong()); } - public void testClusterAliasValidation() { - expectThrows(NullPointerException.class, () -> new SearchRequest(null, 0)); - expectThrows(IllegalArgumentException.class, () -> new SearchRequest("", -1)); - SearchRequest searchRequest = new SearchRequest("", 0); + public void testWithLocalReduction() { + expectThrows(NullPointerException.class, () -> SearchRequest.withLocalReduction(null, Strings.EMPTY_ARRAY, "", 0)); + SearchRequest request = new SearchRequest(); + expectThrows(NullPointerException.class, () -> SearchRequest.withLocalReduction(request, null, "", 0)); + expectThrows(NullPointerException.class, () -> SearchRequest.withLocalReduction(request, new String[]{null}, "", 0)); + expectThrows(NullPointerException.class, () -> SearchRequest.withLocalReduction(request, Strings.EMPTY_ARRAY, null, 0)); + expectThrows(IllegalArgumentException.class, () -> SearchRequest.withLocalReduction(request, Strings.EMPTY_ARRAY, "", -1)); + SearchRequest searchRequest = SearchRequest.withLocalReduction(request, Strings.EMPTY_ARRAY, "", 0); assertNull(searchRequest.validate()); } @@ -72,10 +76,15 @@ public void testSerialization() throws Exception { assertNotSame(deserializedRequest, searchRequest); } - public void testClusterAliasSerialization() throws IOException { + public void testRandomVersionSerialization() throws IOException { SearchRequest searchRequest = createSearchRequest(); Version version = VersionUtils.randomVersion(random()); SearchRequest deserializedRequest = copyWriteable(searchRequest, namedWriteableRegistry, SearchRequest::new, version); + if (version.before(Version.V_7_0_0)) { + assertTrue(deserializedRequest.isCcsMinimizeRoundtrips()); + } else { + assertEquals(searchRequest.isCcsMinimizeRoundtrips(), deserializedRequest.isCcsMinimizeRoundtrips()); + } if (version.before(Version.V_6_7_0)) { assertNull(deserializedRequest.getLocalClusterAlias()); assertAbsoluteStartMillisIsCurrentTime(deserializedRequest); @@ -93,6 +102,7 @@ public void testReadFromPre6_7_0() throws IOException { assertArrayEquals(new String[]{"index"}, searchRequest.indices()); assertNull(searchRequest.getLocalClusterAlias()); assertAbsoluteStartMillisIsCurrentTime(searchRequest); + assertTrue(searchRequest.isCcsMinimizeRoundtrips()); } } @@ -215,6 +225,7 @@ private SearchRequest mutate(SearchRequest searchRequest) { mutators.add(() -> mutation.searchType(randomValueOtherThan(searchRequest.searchType(), () -> randomFrom(SearchType.DFS_QUERY_THEN_FETCH, SearchType.QUERY_THEN_FETCH)))); mutators.add(() -> mutation.source(randomValueOtherThan(searchRequest.source(), this::createSearchSourceBuilder))); + mutators.add(() -> mutation.setCcsMinimizeRoundtrips(searchRequest.isCcsMinimizeRoundtrips() == false)); randomFrom(mutators).run(); return mutation; } diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java index d02b712eaaef3..712d6a60440fe 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java @@ -73,7 +73,7 @@ public class SearchResponseMergerTests extends ESTestCase { @Before public void init() { - numResponses = randomIntBetween(2, 10); + numResponses = randomIntBetween(1, 10); executorService = Executors.newFixedThreadPool(numResponses); } @@ -87,7 +87,7 @@ private void addResponse(SearchResponseMerger searchResponseMerger, SearchRespon private void awaitResponsesAdded() throws InterruptedException { executorService.shutdown(); - executorService.awaitTermination(5, TimeUnit.SECONDS); + assertTrue(executorService.awaitTermination(5, TimeUnit.SECONDS)); } public void testMergeTookInMillis() throws InterruptedException { @@ -137,6 +137,7 @@ public void testMergeShardFailures() throws InterruptedException { addResponse(merger, searchResponse); } awaitResponsesAdded(); + assertEquals(numResponses, merger.numResponses()); SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); SearchResponse mergedResponse = merger.getMergedResponse(clusters); assertSame(clusters, mergedResponse.getClusters()); @@ -170,6 +171,7 @@ public void testMergeShardFailuresNullShardId() throws InterruptedException { addResponse(merger, searchResponse); } awaitResponsesAdded(); + assertEquals(numResponses, merger.numResponses()); ShardSearchFailure[] shardFailures = merger.getMergedResponse(SearchResponse.Clusters.EMPTY).getShardFailures(); assertThat(Arrays.asList(shardFailures), containsInAnyOrder(expectedFailures.toArray(ShardSearchFailure.EMPTY_ARRAY))); } @@ -189,6 +191,7 @@ public void testMergeProfileResults() throws InterruptedException { addResponse(merger, searchResponse); } awaitResponsesAdded(); + assertEquals(numResponses, merger.numResponses()); SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); SearchResponse mergedResponse = merger.getMergedResponse(clusters); assertSame(clusters, mergedResponse.getClusters()); @@ -221,6 +224,7 @@ public void testMergeSuggestions() throws InterruptedException { addResponse(searchResponseMerger, searchResponse); } awaitResponsesAdded(); + assertEquals(numResponses, searchResponseMerger.numResponses()); SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); SearchResponse mergedResponse = searchResponseMerger.getMergedResponse(clusters); assertSame(clusters, mergedResponse.getClusters()); @@ -267,6 +271,7 @@ public void testMergeAggs() throws InterruptedException { addResponse(searchResponseMerger, searchResponse); } awaitResponsesAdded(); + assertEquals(numResponses, searchResponseMerger.numResponses()); SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); SearchResponse mergedResponse = searchResponseMerger.getMergedResponse(clusters); assertSame(clusters, mergedResponse.getClusters()); @@ -334,7 +339,7 @@ public void testMergeSearchHits() throws InterruptedException { Iterator> indicesIterator = randomRealisticIndices(numIndices, numResponses).entrySet().iterator(); for (int i = 0; i < numResponses; i++) { Map.Entry entry = indicesIterator.next(); - String clusterAlias = entry.getKey().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) ? null : entry.getKey(); + String clusterAlias = entry.getKey(); Index[] indices = entry.getValue(); int total = randomIntBetween(1, 1000); expectedTotal += total; @@ -386,7 +391,7 @@ public void testMergeSearchHits() throws InterruptedException { } awaitResponsesAdded(); - + assertEquals(numResponses, searchResponseMerger.numResponses()); final SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); SearchResponse searchResponse = searchResponseMerger.getMergedResponse(clusters); @@ -434,6 +439,33 @@ public void testMergeSearchHits() throws InterruptedException { } } + public void testMergeNoResponsesAdded() { + long currentRelativeTime = randomLong(); + final SearchTimeProvider timeProvider = new SearchTimeProvider(randomLong(), 0, () -> currentRelativeTime); + SearchResponseMerger merger = new SearchResponseMerger(0, 10, Integer.MAX_VALUE, timeProvider, flag -> null); + SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); + assertEquals(0, merger.numResponses()); + SearchResponse response = merger.getMergedResponse(clusters); + assertSame(clusters, response.getClusters()); + assertEquals(TimeUnit.NANOSECONDS.toMillis(currentRelativeTime), response.getTook().millis()); + assertEquals(0, response.getTotalShards()); + assertEquals(0, response.getSuccessfulShards()); + assertEquals(0, response.getSkippedShards()); + assertEquals(0, response.getFailedShards()); + assertEquals(0, response.getNumReducePhases()); + assertFalse(response.isTimedOut()); + assertNotNull(response.getHits().getTotalHits()); + assertEquals(0, response.getHits().getTotalHits().value); + assertEquals(0, response.getHits().getHits().length); + assertEquals(TotalHits.Relation.EQUAL_TO, response.getHits().getTotalHits().relation); + assertNull(response.getScrollId()); + assertSame(InternalAggregations.EMPTY, response.getAggregations()); + assertNull(response.getSuggest()); + assertEquals(0, response.getProfileResults().size()); + assertNull(response.isTerminatedEarly()); + assertEquals(0, response.getShardFailures().length); + } + private static Tuple randomTrackTotalHits() { switch(randomIntBetween(0, 2)) { case 0: @@ -499,8 +531,11 @@ private static Map randomRealisticIndices(int numIndices, int n for (int i = 0; i < numClusters; i++) { Index[] indices = new Index[indicesNames.length]; for (int j = 0; j < indices.length; j++) { - //Realistically clusters have the same indices with same names, but different uuid - indices[j] = new Index(indicesNames[j], randomAlphaOfLength(10)); + String indexName = indicesNames[j]; + //Realistically clusters have the same indices with same names, but different uuid. Yet it can happen that the same cluster + //is registered twice with different aliases and searched multiple times as part of the same search request. + String indexUuid = frequently() ? randomAlphaOfLength(10) : indexName; + indices[j] = new Index(indexName, indexUuid); } String clusterAlias; if (frequently() || indicesPerCluster.containsKey(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { @@ -551,10 +586,22 @@ public int compare(SearchHit a, SearchHit b) { } } } - int shardIdCompareTo = a.getShard().getShardId().compareTo(b.getShard().getShardId()); + SearchShardTarget aShard = a.getShard(); + SearchShardTarget bShard = b.getShard(); + int shardIdCompareTo = aShard.getShardId().compareTo(bShard.getShardId()); if (shardIdCompareTo != 0) { return shardIdCompareTo; } + int clusterAliasCompareTo = aShard.getClusterAlias().compareTo(bShard.getClusterAlias()); + if (clusterAliasCompareTo != 0) { + if (aShard.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { + return 1; + } + if (bShard.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { + return -1; + } + return clusterAliasCompareTo; + } return Integer.compare(a.docId(), b.docId()); } } diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchResponseTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchResponseTests.java index f07be38765f66..18890e1339557 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchResponseTests.java @@ -20,13 +20,11 @@ package org.elasticsearch.action.search; import org.apache.lucene.search.TotalHits; +import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.text.Text; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -246,7 +244,8 @@ public void testToXContent() { new InternalSearchResponse( new SearchHits(hits, new TotalHits(100, TotalHits.Relation.EQUAL_TO), 1.5f), null, null, null, false, null, 1 ), - null, 0, 0, 0, 0, ShardSearchFailure.EMPTY_ARRAY, new SearchResponse.Clusters(5, 3, 2)); + null, 0, 0, 0, 0, ShardSearchFailure.EMPTY_ARRAY, + new SearchResponse.Clusters(5, 3, 2)); StringBuilder expectedString = new StringBuilder(); expectedString.append("{"); { @@ -279,24 +278,18 @@ public void testToXContent() { public void testSerialization() throws IOException { SearchResponse searchResponse = createTestItem(false); - BytesStreamOutput bytesStreamOutput = new BytesStreamOutput(); - searchResponse.writeTo(bytesStreamOutput); - try (StreamInput in = new NamedWriteableAwareStreamInput( - StreamInput.wrap(bytesStreamOutput.bytes().toBytesRef().bytes), namedWriteableRegistry)) { - SearchResponse serialized = new SearchResponse(); - serialized.readFrom(in); - if (searchResponse.getHits().getTotalHits() == null) { - assertNull(serialized.getHits().getTotalHits()); - } else { - assertEquals(searchResponse.getHits().getTotalHits().value, serialized.getHits().getTotalHits().value); - assertEquals(searchResponse.getHits().getTotalHits().relation, serialized.getHits().getTotalHits().relation); - } - assertEquals(searchResponse.getHits().getHits().length, serialized.getHits().getHits().length); - assertEquals(searchResponse.getNumReducePhases(), serialized.getNumReducePhases()); - assertEquals(searchResponse.getFailedShards(), serialized.getFailedShards()); - assertEquals(searchResponse.getTotalShards(), serialized.getTotalShards()); - assertEquals(searchResponse.getSkippedShards(), serialized.getSkippedShards()); - assertEquals(searchResponse.getClusters(), serialized.getClusters()); + SearchResponse deserialized = copyStreamable(searchResponse, namedWriteableRegistry, SearchResponse::new, Version.CURRENT); + if (searchResponse.getHits().getTotalHits() == null) { + assertNull(deserialized.getHits().getTotalHits()); + } else { + assertEquals(searchResponse.getHits().getTotalHits().value, deserialized.getHits().getTotalHits().value); + assertEquals(searchResponse.getHits().getTotalHits().relation, deserialized.getHits().getTotalHits().relation); } + assertEquals(searchResponse.getHits().getHits().length, deserialized.getHits().getHits().length); + assertEquals(searchResponse.getNumReducePhases(), deserialized.getNumReducePhases()); + assertEquals(searchResponse.getFailedShards(), deserialized.getFailedShards()); + assertEquals(searchResponse.getTotalShards(), deserialized.getTotalShards()); + assertEquals(searchResponse.getSkippedShards(), deserialized.getSkippedShards()); + assertEquals(searchResponse.getClusters(), deserialized.getClusters()); } } diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java index 19bd76ec09da2..8fd75c5fd673d 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.common.Strings; import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; @@ -41,7 +42,7 @@ public void testLocalClusterAlias() { assertEquals(RestStatus.CREATED, indexResponse.status()); { - SearchRequest searchRequest = new SearchRequest("local", nowInMillis); + SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(), Strings.EMPTY_ARRAY, "local", nowInMillis); SearchResponse searchResponse = client().search(searchRequest).actionGet(); assertEquals(1, searchResponse.getHits().getTotalHits().value); SearchHit[] hits = searchResponse.getHits().getHits(); @@ -52,7 +53,7 @@ public void testLocalClusterAlias() { assertEquals("1", hit.getId()); } { - SearchRequest searchRequest = new SearchRequest("", nowInMillis); + SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(), Strings.EMPTY_ARRAY, "", nowInMillis); SearchResponse searchResponse = client().search(searchRequest).actionGet(); assertEquals(1, searchResponse.getHits().getTotalHits().value); SearchHit[] hits = searchResponse.getHits().getHits(); @@ -93,19 +94,19 @@ public void testAbsoluteStartMillis() { assertEquals(0, searchResponse.getTotalShards()); } { - SearchRequest searchRequest = new SearchRequest("", 0); + SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(), Strings.EMPTY_ARRAY, "", 0); SearchResponse searchResponse = client().search(searchRequest).actionGet(); assertEquals(2, searchResponse.getHits().getTotalHits().value); } { - SearchRequest searchRequest = new SearchRequest("", 0); + SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(), Strings.EMPTY_ARRAY, "", 0); searchRequest.indices(""); SearchResponse searchResponse = client().search(searchRequest).actionGet(); assertEquals(1, searchResponse.getHits().getTotalHits().value); assertEquals("test-1970.01.01", searchResponse.getHits().getHits()[0].getIndex()); } { - SearchRequest searchRequest = new SearchRequest("", 0); + SearchRequest searchRequest = SearchRequest.withLocalReduction(new SearchRequest(), Strings.EMPTY_ARRAY, "", 0); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); RangeQueryBuilder rangeQuery = new RangeQueryBuilder("date"); rangeQuery.gte("1970-01-01"); diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index 1b99beee65e81..8a5859e200eac 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -19,6 +19,8 @@ package org.elasticsearch.action.search; +import org.apache.lucene.search.TotalHits; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; @@ -34,13 +36,24 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.index.query.InnerHitBuilder; import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.TermsQueryBuilder; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.Scroll; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.collapse.CollapseBuilder; import org.elasticsearch.search.internal.AliasFilter; +import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; @@ -320,169 +333,495 @@ public void close() { } } - private MockTransportService startTransport(String id, List knownNodes) { - return RemoteClusterConnectionTests.startTransport(id, knownNodes, Version.CURRENT, threadPool); - } - - public void testCollectSearchShards() throws Exception { - int numClusters = randomIntBetween(2, 10); + private MockTransportService[] startTransport(int numClusters, DiscoveryNode[] nodes, Map remoteIndices, + Settings.Builder settingsBuilder) { MockTransportService[] mockTransportServices = new MockTransportService[numClusters]; - DiscoveryNode[] nodes = new DiscoveryNode[numClusters]; - Map remoteIndicesByCluster = new HashMap<>(); - Settings.Builder builder = Settings.builder(); for (int i = 0; i < numClusters; i++) { List knownNodes = new CopyOnWriteArrayList<>(); - MockTransportService remoteSeedTransport = startTransport("node_remote" + i, knownNodes); + MockTransportService remoteSeedTransport = RemoteClusterConnectionTests.startTransport("node_remote" + i, knownNodes, + Version.CURRENT, threadPool); mockTransportServices[i] = remoteSeedTransport; DiscoveryNode remoteSeedNode = remoteSeedTransport.getLocalDiscoNode(); knownNodes.add(remoteSeedNode); nodes[i] = remoteSeedNode; - builder.put("cluster.remote.remote" + i + ".seeds", remoteSeedNode.getAddress().toString()); - remoteIndicesByCluster.put("remote" + i, new OriginalIndices(new String[]{"index"}, IndicesOptions.lenientExpandOpen())); + settingsBuilder.put("cluster.remote.remote" + i + ".seeds", remoteSeedNode.getAddress().toString()); + remoteIndices.put("remote" + i, new OriginalIndices(new String[]{"index"}, IndicesOptions.lenientExpandOpen())); } + return mockTransportServices; + } + + private static SearchResponse emptySearchResponse() { + InternalSearchResponse response = new InternalSearchResponse(new SearchHits(new SearchHit[0], + new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN), InternalAggregations.EMPTY, null, null, false, null, 1); + return new SearchResponse(response, null, 1, 1, 0, 100, ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY); + } + + public void testCCSRemoteReduceMergeFails() throws Exception { + int numClusters = randomIntBetween(2, 10); + DiscoveryNode[] nodes = new DiscoveryNode[numClusters]; + Map remoteIndicesByCluster = new HashMap<>(); + Settings.Builder builder = Settings.builder(); + MockTransportService[] mockTransportServices = startTransport(numClusters, nodes, remoteIndicesByCluster, builder); Settings settings = builder.build(); + boolean local = randomBoolean(); + OriginalIndices localIndices = local ? new OriginalIndices(new String[]{"index"}, SearchRequest.DEFAULT_INDICES_OPTIONS) : null; + TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0); + Function reduceContext = finalReduce -> null; + try (MockTransportService service = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + RemoteClusterService remoteClusterService = service.getRemoteClusterService(); + SearchRequest searchRequest = new SearchRequest(); + searchRequest.preference("null_target"); + final CountDownLatch latch = new CountDownLatch(1); + SetOnce>> setOnce = new SetOnce<>(); + AtomicReference failure = new AtomicReference<>(); + LatchedActionListener listener = new LatchedActionListener<>( + ActionListener.wrap(r -> fail("no response expected"), failure::set), latch); + TransportSearchAction.ccsRemoteReduce(searchRequest, localIndices, remoteIndicesByCluster, timeProvider, reduceContext, + remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l))); + if (localIndices == null) { + assertNull(setOnce.get()); + } else { + Tuple> tuple = setOnce.get(); + assertEquals("", tuple.v1().getLocalClusterAlias()); + assertThat(tuple.v2(), instanceOf(TransportSearchAction.CCSActionListener.class)); + tuple.v2().onResponse(emptySearchResponse()); + } + awaitLatch(latch, 5, TimeUnit.SECONDS); + assertNotNull(failure.get()); + //the intention here is not to test that we throw NPE, rather to trigger a situation that makes + //SearchResponseMerger#getMergedResponse fail unexpectedly and verify that the listener is properly notified with the NPE + assertThat(failure.get(), instanceOf(NullPointerException.class)); + assertEquals(0, service.getConnectionManager().size()); + } finally { + for (MockTransportService mockTransportService : mockTransportServices) { + mockTransportService.close(); + } + } + } - try { - try (MockTransportService service = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null)) { - service.start(); - service.acceptIncomingRequests(); - RemoteClusterService remoteClusterService = service.getRemoteClusterService(); - { - final CountDownLatch latch = new CountDownLatch(1); - AtomicReference> response = new AtomicReference<>(); - AtomicInteger skippedClusters = new AtomicInteger(); - TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters, - remoteIndicesByCluster, remoteClusterService, threadPool, - new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch)); - awaitLatch(latch, 5, TimeUnit.SECONDS); - assertEquals(0, skippedClusters.get()); - assertNotNull(response.get()); - Map map = response.get(); - assertEquals(numClusters, map.size()); - for (int i = 0; i < numClusters; i++) { - String clusterAlias = "remote" + i; - assertTrue(map.containsKey(clusterAlias)); - ClusterSearchShardsResponse shardsResponse = map.get(clusterAlias); - assertEquals(1, shardsResponse.getNodes().length); - } + public void testCCSRemoteReduce() throws Exception { + int numClusters = randomIntBetween(2, 10); + DiscoveryNode[] nodes = new DiscoveryNode[numClusters]; + Map remoteIndicesByCluster = new HashMap<>(); + Settings.Builder builder = Settings.builder(); + MockTransportService[] mockTransportServices = startTransport(numClusters, nodes, remoteIndicesByCluster, builder); + Settings settings = builder.build(); + boolean local = randomBoolean(); + OriginalIndices localIndices = local ? new OriginalIndices(new String[]{"index"}, SearchRequest.DEFAULT_INDICES_OPTIONS) : null; + int totalClusters = numClusters + (local ? 1 : 0); + TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0); + Function reduceContext = finalReduce -> null; + try (MockTransportService service = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + RemoteClusterService remoteClusterService = service.getRemoteClusterService(); + { + SearchRequest searchRequest = new SearchRequest(); + final CountDownLatch latch = new CountDownLatch(1); + SetOnce>> setOnce = new SetOnce<>(); + AtomicReference response = new AtomicReference<>(); + LatchedActionListener listener = new LatchedActionListener<>( + ActionListener.wrap(response::set, e -> fail("no failures expected")), latch); + TransportSearchAction.ccsRemoteReduce(searchRequest, localIndices, remoteIndicesByCluster, timeProvider, reduceContext, + remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l))); + if (localIndices == null) { + assertNull(setOnce.get()); + } else { + Tuple> tuple = setOnce.get(); + assertEquals("", tuple.v1().getLocalClusterAlias()); + assertThat(tuple.v2(), instanceOf(TransportSearchAction.CCSActionListener.class)); + tuple.v2().onResponse(emptySearchResponse()); } - { - final CountDownLatch latch = new CountDownLatch(1); - AtomicReference failure = new AtomicReference<>(); - AtomicInteger skippedClusters = new AtomicInteger(0); - TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), "index_not_found", null, skippedClusters, - remoteIndicesByCluster, remoteClusterService, threadPool, - new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch)); - awaitLatch(latch, 5, TimeUnit.SECONDS); - assertEquals(0, skippedClusters.get()); - assertNotNull(failure.get()); - assertThat(failure.get(), instanceOf(RemoteTransportException.class)); - RemoteTransportException remoteTransportException = (RemoteTransportException) failure.get(); - assertEquals(RestStatus.NOT_FOUND, remoteTransportException.status()); + awaitLatch(latch, 5, TimeUnit.SECONDS); + + SearchResponse searchResponse = response.get(); + assertEquals(0, searchResponse.getClusters().getSkipped()); + assertEquals(totalClusters, searchResponse.getClusters().getTotal()); + assertEquals(totalClusters, searchResponse.getClusters().getSuccessful()); + assertEquals(totalClusters + 1, searchResponse.getNumReducePhases()); + } + { + SearchRequest searchRequest = new SearchRequest(); + searchRequest.preference("index_not_found"); + final CountDownLatch latch = new CountDownLatch(1); + SetOnce>> setOnce = new SetOnce<>(); + AtomicReference failure = new AtomicReference<>(); + LatchedActionListener listener = new LatchedActionListener<>( + ActionListener.wrap(r -> fail("no response expected"), failure::set), latch); + TransportSearchAction.ccsRemoteReduce(searchRequest, localIndices, remoteIndicesByCluster, timeProvider, reduceContext, + remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l))); + if (localIndices == null) { + assertNull(setOnce.get()); + } else { + Tuple> tuple = setOnce.get(); + assertEquals("", tuple.v1().getLocalClusterAlias()); + assertThat(tuple.v2(), instanceOf(TransportSearchAction.CCSActionListener.class)); + tuple.v2().onResponse(emptySearchResponse()); } + awaitLatch(latch, 5, TimeUnit.SECONDS); + assertNotNull(failure.get()); + assertThat(failure.get(), instanceOf(RemoteTransportException.class)); + RemoteTransportException remoteTransportException = (RemoteTransportException) failure.get(); + assertEquals(RestStatus.NOT_FOUND, remoteTransportException.status()); + } - int numDisconnectedClusters = randomIntBetween(1, numClusters); - Set disconnectedNodes = new HashSet<>(numDisconnectedClusters); - Set disconnectedNodesIndices = new HashSet<>(numDisconnectedClusters); - while (disconnectedNodes.size() < numDisconnectedClusters) { - int i = randomIntBetween(0, numClusters - 1); - if (disconnectedNodes.add(nodes[i])) { - assertTrue(disconnectedNodesIndices.add(i)); - } + int numDisconnectedClusters = randomIntBetween(1, numClusters); + Set disconnectedNodes = new HashSet<>(numDisconnectedClusters); + Set disconnectedNodesIndices = new HashSet<>(numDisconnectedClusters); + while (disconnectedNodes.size() < numDisconnectedClusters) { + int i = randomIntBetween(0, numClusters - 1); + if (disconnectedNodes.add(nodes[i])) { + assertTrue(disconnectedNodesIndices.add(i)); } + } - CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters); - RemoteClusterServiceTests.addConnectionListener(remoteClusterService, new TransportConnectionListener() { - @Override - public void onNodeDisconnected(DiscoveryNode node) { - if (disconnectedNodes.remove(node)) { - disconnectedLatch.countDown(); - } + CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters); + RemoteClusterServiceTests.addConnectionListener(remoteClusterService, new TransportConnectionListener() { + @Override + public void onNodeDisconnected(DiscoveryNode node) { + if (disconnectedNodes.remove(node)) { + disconnectedLatch.countDown(); } - }); - for (DiscoveryNode disconnectedNode : disconnectedNodes) { - service.addFailToSendNoConnectRule(disconnectedNode.getAddress()); } + }); + for (DiscoveryNode disconnectedNode : disconnectedNodes) { + service.addFailToSendNoConnectRule(disconnectedNode.getAddress()); + } - { - final CountDownLatch latch = new CountDownLatch(1); - AtomicInteger skippedClusters = new AtomicInteger(0); - AtomicReference failure = new AtomicReference<>(); - TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters, - remoteIndicesByCluster, remoteClusterService, threadPool, - new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch)); - awaitLatch(latch, 5, TimeUnit.SECONDS); - assertEquals(0, skippedClusters.get()); - assertNotNull(failure.get()); - assertThat(failure.get(), instanceOf(RemoteTransportException.class)); - assertThat(failure.get().getMessage(), containsString("error while communicating with remote cluster [")); - assertThat(failure.get().getCause(), instanceOf(NodeDisconnectedException.class)); + { + SearchRequest searchRequest = new SearchRequest(); + final CountDownLatch latch = new CountDownLatch(1); + SetOnce>> setOnce = new SetOnce<>(); + AtomicReference failure = new AtomicReference<>(); + LatchedActionListener listener = new LatchedActionListener<>( + ActionListener.wrap(r -> fail("no response expected"), failure::set), latch); + TransportSearchAction.ccsRemoteReduce(searchRequest, localIndices, remoteIndicesByCluster, timeProvider, reduceContext, + remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l))); + if (localIndices == null) { + assertNull(setOnce.get()); + } else { + Tuple> tuple = setOnce.get(); + assertEquals("", tuple.v1().getLocalClusterAlias()); + assertThat(tuple.v2(), instanceOf(TransportSearchAction.CCSActionListener.class)); + tuple.v2().onResponse(emptySearchResponse()); } + awaitLatch(latch, 5, TimeUnit.SECONDS); + assertNotNull(failure.get()); + assertThat(failure.get(), instanceOf(RemoteTransportException.class)); + RemoteTransportException remoteTransportException = (RemoteTransportException) failure.get(); + assertThat(failure.get().getMessage(), containsString("error while communicating with remote cluster [")); + assertThat(failure.get().getCause(), instanceOf(NodeDisconnectedException.class)); + } - //setting skip_unavailable to true for all the disconnected clusters will make the request succeed again - for (int i : disconnectedNodesIndices) { - RemoteClusterServiceTests.updateSkipUnavailable(remoteClusterService, "remote" + i, true); + //setting skip_unavailable to true for all the disconnected clusters will make the request succeed again + for (int i : disconnectedNodesIndices) { + RemoteClusterServiceTests.updateSkipUnavailable(remoteClusterService, "remote" + i, true); + } + + { + SearchRequest searchRequest = new SearchRequest(); + final CountDownLatch latch = new CountDownLatch(1); + SetOnce>> setOnce = new SetOnce<>(); + AtomicReference response = new AtomicReference<>(); + LatchedActionListener listener = new LatchedActionListener<>( + ActionListener.wrap(response::set, e -> fail("no failures expected")), latch); + TransportSearchAction.ccsRemoteReduce(searchRequest, localIndices, remoteIndicesByCluster, timeProvider, reduceContext, + remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l))); + if (localIndices == null) { + assertNull(setOnce.get()); + } else { + Tuple> tuple = setOnce.get(); + assertEquals("", tuple.v1().getLocalClusterAlias()); + assertThat(tuple.v2(), instanceOf(TransportSearchAction.CCSActionListener.class)); + tuple.v2().onResponse(emptySearchResponse()); } + awaitLatch(latch, 5, TimeUnit.SECONDS); + + SearchResponse searchResponse = response.get(); + assertEquals(disconnectedNodesIndices.size(), searchResponse.getClusters().getSkipped()); + assertEquals(totalClusters, searchResponse.getClusters().getTotal()); + int successful = totalClusters - disconnectedNodesIndices.size(); + assertEquals(successful, searchResponse.getClusters().getSuccessful()); + assertEquals(successful == 0 ? 0 : successful + 1, searchResponse.getNumReducePhases()); + } + + //give transport service enough time to realize that the node is down, and to notify the connection listeners + //so that RemoteClusterConnection is left with no connected nodes, hence it will retry connecting next + assertTrue(disconnectedLatch.await(5, TimeUnit.SECONDS)); - { - final CountDownLatch latch = new CountDownLatch(1); - AtomicInteger skippedClusters = new AtomicInteger(0); - AtomicReference> response = new AtomicReference<>(); - TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters, - remoteIndicesByCluster, remoteClusterService, threadPool, - new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch)); - awaitLatch(latch, 5, TimeUnit.SECONDS); - assertNotNull(response.get()); - Map map = response.get(); - assertEquals(numClusters - disconnectedNodesIndices.size(), map.size()); - assertEquals(skippedClusters.get(), disconnectedNodesIndices.size()); - for (int i = 0; i < numClusters; i++) { - String clusterAlias = "remote" + i; - if (disconnectedNodesIndices.contains(i)) { - assertFalse(map.containsKey(clusterAlias)); - } else { - assertNotNull(map.get(clusterAlias)); - } + service.clearAllRules(); + if (randomBoolean()) { + for (int i : disconnectedNodesIndices) { + if (randomBoolean()) { + RemoteClusterServiceTests.updateSkipUnavailable(remoteClusterService, "remote" + i, true); } + + } + } + { + SearchRequest searchRequest = new SearchRequest(); + final CountDownLatch latch = new CountDownLatch(1); + SetOnce>> setOnce = new SetOnce<>(); + AtomicReference response = new AtomicReference<>(); + LatchedActionListener listener = new LatchedActionListener<>( + ActionListener.wrap(response::set, e -> fail("no failures expected")), latch); + TransportSearchAction.ccsRemoteReduce(searchRequest, localIndices, remoteIndicesByCluster, timeProvider, reduceContext, + remoteClusterService, threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l))); + if (localIndices == null) { + assertNull(setOnce.get()); + } else { + Tuple> tuple = setOnce.get(); + assertEquals("", tuple.v1().getLocalClusterAlias()); + assertThat(tuple.v2(), instanceOf(TransportSearchAction.CCSActionListener.class)); + tuple.v2().onResponse(emptySearchResponse()); } + awaitLatch(latch, 5, TimeUnit.SECONDS); + + SearchResponse searchResponse = response.get(); + assertEquals(0, searchResponse.getClusters().getSkipped()); + assertEquals(totalClusters, searchResponse.getClusters().getTotal()); + assertEquals(totalClusters, searchResponse.getClusters().getSuccessful()); + assertEquals(totalClusters + 1, searchResponse.getNumReducePhases()); + } + assertEquals(0, service.getConnectionManager().size()); + } finally { + for (MockTransportService mockTransportService : mockTransportServices) { + mockTransportService.close(); + } + } + } - //give transport service enough time to realize that the node is down, and to notify the connection listeners - //so that RemoteClusterConnection is left with no connected nodes, hence it will retry connecting next - assertTrue(disconnectedLatch.await(5, TimeUnit.SECONDS)); + public void testCollectSearchShards() throws Exception { + int numClusters = randomIntBetween(2, 10); + DiscoveryNode[] nodes = new DiscoveryNode[numClusters]; + Map remoteIndicesByCluster = new HashMap<>(); + Settings.Builder builder = Settings.builder(); + MockTransportService[] mockTransportServices = startTransport(numClusters, nodes, remoteIndicesByCluster, builder); + Settings settings = builder.build(); + try (MockTransportService service = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + RemoteClusterService remoteClusterService = service.getRemoteClusterService(); + { + final CountDownLatch latch = new CountDownLatch(1); + AtomicReference> response = new AtomicReference<>(); + AtomicInteger skippedClusters = new AtomicInteger(); + TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters, + remoteIndicesByCluster, remoteClusterService, threadPool, + new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch)); + awaitLatch(latch, 5, TimeUnit.SECONDS); + assertEquals(0, skippedClusters.get()); + assertNotNull(response.get()); + Map map = response.get(); + assertEquals(numClusters, map.size()); + for (int i = 0; i < numClusters; i++) { + String clusterAlias = "remote" + i; + assertTrue(map.containsKey(clusterAlias)); + ClusterSearchShardsResponse shardsResponse = map.get(clusterAlias); + assertEquals(1, shardsResponse.getNodes().length); + } + } + { + final CountDownLatch latch = new CountDownLatch(1); + AtomicReference failure = new AtomicReference<>(); + AtomicInteger skippedClusters = new AtomicInteger(0); + TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), "index_not_found", null, skippedClusters, + remoteIndicesByCluster, remoteClusterService, threadPool, + new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch)); + awaitLatch(latch, 5, TimeUnit.SECONDS); + assertEquals(0, skippedClusters.get()); + assertNotNull(failure.get()); + assertThat(failure.get(), instanceOf(RemoteTransportException.class)); + RemoteTransportException remoteTransportException = (RemoteTransportException) failure.get(); + assertEquals(RestStatus.NOT_FOUND, remoteTransportException.status()); + } - service.clearAllRules(); - if (randomBoolean()) { - for (int i : disconnectedNodesIndices) { - if (randomBoolean()) { - RemoteClusterServiceTests.updateSkipUnavailable(remoteClusterService, "remote" + i, true); - } + int numDisconnectedClusters = randomIntBetween(1, numClusters); + Set disconnectedNodes = new HashSet<>(numDisconnectedClusters); + Set disconnectedNodesIndices = new HashSet<>(numDisconnectedClusters); + while (disconnectedNodes.size() < numDisconnectedClusters) { + int i = randomIntBetween(0, numClusters - 1); + if (disconnectedNodes.add(nodes[i])) { + assertTrue(disconnectedNodesIndices.add(i)); + } + } + CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters); + RemoteClusterServiceTests.addConnectionListener(remoteClusterService, new TransportConnectionListener() { + @Override + public void onNodeDisconnected(DiscoveryNode node) { + if (disconnectedNodes.remove(node)) { + disconnectedLatch.countDown(); } } - { - final CountDownLatch latch = new CountDownLatch(1); - AtomicInteger skippedClusters = new AtomicInteger(0); - AtomicReference> response = new AtomicReference<>(); - TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters, - remoteIndicesByCluster, remoteClusterService, threadPool, - new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch)); - awaitLatch(latch, 5, TimeUnit.SECONDS); - assertEquals(0, skippedClusters.get()); - assertNotNull(response.get()); - Map map = response.get(); - assertEquals(numClusters, map.size()); - for (int i = 0; i < numClusters; i++) { - String clusterAlias = "remote" + i; - assertTrue(map.containsKey(clusterAlias)); + }); + for (DiscoveryNode disconnectedNode : disconnectedNodes) { + service.addFailToSendNoConnectRule(disconnectedNode.getAddress()); + } + + { + final CountDownLatch latch = new CountDownLatch(1); + AtomicInteger skippedClusters = new AtomicInteger(0); + AtomicReference failure = new AtomicReference<>(); + TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters, + remoteIndicesByCluster, remoteClusterService, threadPool, + new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch)); + awaitLatch(latch, 5, TimeUnit.SECONDS); + assertEquals(0, skippedClusters.get()); + assertNotNull(failure.get()); + assertThat(failure.get(), instanceOf(RemoteTransportException.class)); + assertThat(failure.get().getMessage(), containsString("error while communicating with remote cluster [")); + assertThat(failure.get().getCause(), instanceOf(NodeDisconnectedException.class)); + } + + //setting skip_unavailable to true for all the disconnected clusters will make the request succeed again + for (int i : disconnectedNodesIndices) { + RemoteClusterServiceTests.updateSkipUnavailable(remoteClusterService, "remote" + i, true); + } + + { + final CountDownLatch latch = new CountDownLatch(1); + AtomicInteger skippedClusters = new AtomicInteger(0); + AtomicReference> response = new AtomicReference<>(); + TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters, + remoteIndicesByCluster, remoteClusterService, threadPool, + new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch)); + awaitLatch(latch, 5, TimeUnit.SECONDS); + assertNotNull(response.get()); + Map map = response.get(); + assertEquals(numClusters - disconnectedNodesIndices.size(), map.size()); + assertEquals(skippedClusters.get(), disconnectedNodesIndices.size()); + for (int i = 0; i < numClusters; i++) { + String clusterAlias = "remote" + i; + if (disconnectedNodesIndices.contains(i)) { + assertFalse(map.containsKey(clusterAlias)); + } else { assertNotNull(map.get(clusterAlias)); } } - assertEquals(0, service.getConnectionManager().size()); } + + //give transport service enough time to realize that the node is down, and to notify the connection listeners + //so that RemoteClusterConnection is left with no connected nodes, hence it will retry connecting next + assertTrue(disconnectedLatch.await(5, TimeUnit.SECONDS)); + + service.clearAllRules(); + if (randomBoolean()) { + for (int i : disconnectedNodesIndices) { + if (randomBoolean()) { + RemoteClusterServiceTests.updateSkipUnavailable(remoteClusterService, "remote" + i, true); + } + + } + } + { + final CountDownLatch latch = new CountDownLatch(1); + AtomicInteger skippedClusters = new AtomicInteger(0); + AtomicReference> response = new AtomicReference<>(); + TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters, + remoteIndicesByCluster, remoteClusterService, threadPool, + new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch)); + awaitLatch(latch, 5, TimeUnit.SECONDS); + assertEquals(0, skippedClusters.get()); + assertNotNull(response.get()); + Map map = response.get(); + assertEquals(numClusters, map.size()); + for (int i = 0; i < numClusters; i++) { + String clusterAlias = "remote" + i; + assertTrue(map.containsKey(clusterAlias)); + assertNotNull(map.get(clusterAlias)); + } + } + assertEquals(0, service.getConnectionManager().size()); } finally { for (MockTransportService mockTransportService : mockTransportServices) { mockTransportService.close(); } } } + + public void testCreateSearchResponseMerger() { + TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0); + Function reduceContext = flag -> null; + { + SearchSourceBuilder source = new SearchSourceBuilder(); + assertEquals(-1, source.size()); + assertEquals(-1, source.from()); + assertNull(source.trackTotalHitsUpTo()); + SearchResponseMerger merger = TransportSearchAction.createSearchResponseMerger(source, timeProvider, reduceContext); + assertEquals(0, merger.from); + assertEquals(10, merger.size); + assertEquals(SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO, merger.trackTotalHitsUpTo); + assertEquals(0, source.from()); + assertEquals(10, source.size()); + assertNull(source.trackTotalHitsUpTo()); + } + { + SearchResponseMerger merger = TransportSearchAction.createSearchResponseMerger(null, timeProvider, reduceContext); + assertEquals(0, merger.from); + assertEquals(10, merger.size); + assertEquals(SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO, merger.trackTotalHitsUpTo); + } + { + SearchSourceBuilder source = new SearchSourceBuilder(); + int originalFrom = randomIntBetween(0, 1000); + source.from(originalFrom); + int originalSize = randomIntBetween(0, 1000); + source.size(originalSize); + int trackTotalHitsUpTo = randomIntBetween(0, Integer.MAX_VALUE); + source.trackTotalHitsUpTo(trackTotalHitsUpTo); + SearchResponseMerger merger = TransportSearchAction.createSearchResponseMerger(source, timeProvider, reduceContext); + assertEquals(0, source.from()); + assertEquals(originalFrom + originalSize, source.size()); + assertEquals(trackTotalHitsUpTo, (int)source.trackTotalHitsUpTo()); + assertEquals(originalFrom, merger.from); + assertEquals(originalSize, merger.size); + assertEquals(trackTotalHitsUpTo, merger.trackTotalHitsUpTo); + } + } + + public void testShouldMinimizeRoundtrips() throws Exception { + { + SearchRequest searchRequest = new SearchRequest(); + assertTrue(TransportSearchAction.shouldMinimizeRoundtrips(searchRequest)); + } + { + SearchRequest searchRequest = new SearchRequest(); + searchRequest.source(new SearchSourceBuilder()); + assertTrue(TransportSearchAction.shouldMinimizeRoundtrips(searchRequest)); + } + { + SearchRequest searchRequest = new SearchRequest(); + searchRequest.scroll("5s"); + assertFalse(TransportSearchAction.shouldMinimizeRoundtrips(searchRequest)); + } + { + SearchRequest searchRequest = new SearchRequest(); + SearchSourceBuilder source = new SearchSourceBuilder(); + searchRequest.source(source); + CollapseBuilder collapseBuilder = new CollapseBuilder("field"); + source.collapse(collapseBuilder); + collapseBuilder.setInnerHits(new InnerHitBuilder("inner")); + assertFalse(TransportSearchAction.shouldMinimizeRoundtrips(searchRequest)); + } + { + SearchRequestTests searchRequestTests = new SearchRequestTests(); + searchRequestTests.setUp(); + SearchRequest searchRequest = searchRequestTests.createSearchRequest(); + searchRequest.scroll((Scroll)null); + SearchSourceBuilder source = searchRequest.source(); + if (source != null) { + CollapseBuilder collapse = source.collapse(); + if (collapse != null) { + collapse.setInnerHits(Collections.emptyList()); + } + } + searchRequest.setCcsMinimizeRoundtrips(true); + assertTrue(TransportSearchAction.shouldMinimizeRoundtrips(searchRequest)); + searchRequest.setCcsMinimizeRoundtrips(false); + assertFalse(TransportSearchAction.shouldMinimizeRoundtrips(searchRequest)); + } + } } diff --git a/server/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java b/server/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java index 430cd900660c3..948e29d5d67de 100644 --- a/server/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java +++ b/server/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java @@ -813,7 +813,7 @@ public void testInvalidEnum() throws IOException { assertEquals(0, input.available()); } - private void assertEqualityAfterSerialize(TimeValue value, int expectedSize) throws IOException { + private static void assertEqualityAfterSerialize(TimeValue value, int expectedSize) throws IOException { BytesStreamOutput out = new BytesStreamOutput(); out.writeTimeValue(value); assertEquals(expectedSize, out.size()); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java index 3f85d927e9295..6e9c2e4eaf320 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java @@ -62,10 +62,10 @@ public void testConnectAndExecuteRequest() throws Exception { ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().get(); assertNotNull(clusterStateResponse); assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value()); - // also test a failure, there is no handler for search registered + // also test a failure, there is no handler for scroll registered ActionNotFoundTransportException ex = expectThrows(ActionNotFoundTransportException.class, - () -> client.prepareSearch().get()); - assertEquals("No handler for action [indices:data/read/search]", ex.getMessage()); + () -> client.prepareSearchScroll("").get()); + assertEquals("No handler for action [indices:data/read/scroll]", ex.getMessage()); } } } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 3ec2506da244e..9eddac80a17c0 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.transport; +import org.apache.lucene.search.TotalHits; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -29,6 +30,10 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -47,6 +52,10 @@ import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.mocksocket.MockServerSocket; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.junit.annotations.TestLogging; @@ -130,6 +139,24 @@ public static MockTransportService startTransport( knownNodes.toArray(new DiscoveryNode[0]), Collections.emptyMap())); } }); + newService.registerRequestHandler(SearchAction.NAME, ThreadPool.Names.SAME, SearchRequest::new, + (request, channel, task) -> { + if ("index_not_found".equals(request.preference())) { + channel.sendResponse(new IndexNotFoundException("index")); + return; + } + SearchHits searchHits; + if ("null_target".equals(request.preference())) { + searchHits = new SearchHits(new SearchHit[] {new SearchHit(0)}, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1F); + } else { + searchHits = new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN); + } + InternalSearchResponse response = new InternalSearchResponse(searchHits, + InternalAggregations.EMPTY, null, null, false, null, 1); + SearchResponse searchResponse = new SearchResponse(response, null, 1, 1, 0, 100, ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY); + channel.sendResponse(searchResponse); + }); newService.registerRequestHandler(ClusterStateAction.NAME, ThreadPool.Names.SAME, ClusterStateRequest::new, (request, channel, task) -> { DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); diff --git a/test/framework/src/main/java/org/elasticsearch/search/RandomSearchRequestGenerator.java b/test/framework/src/main/java/org/elasticsearch/search/RandomSearchRequestGenerator.java index 58dbe869b5c71..df554ea42de28 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/RandomSearchRequestGenerator.java +++ b/test/framework/src/main/java/org/elasticsearch/search/RandomSearchRequestGenerator.java @@ -84,18 +84,11 @@ private RandomSearchRequestGenerator() {} * {@link #randomSearchSourceBuilder(Supplier, Supplier, Supplier, Supplier, Supplier)}. */ public static SearchRequest randomSearchRequest(Supplier randomSearchSourceBuilder) { - return randomSearchRequest(new SearchRequest(), randomSearchSourceBuilder); - } - - /** - * Set random fields to the provided search request. - * - * @param searchRequest the search request - * @param randomSearchSourceBuilder builds a random {@link SearchSourceBuilder}. You can use - * {@link #randomSearchSourceBuilder(Supplier, Supplier, Supplier, Supplier, Supplier)}. - */ - public static SearchRequest randomSearchRequest(SearchRequest searchRequest, Supplier randomSearchSourceBuilder) { + SearchRequest searchRequest = new SearchRequest(); searchRequest.allowPartialSearchResults(true); + if (randomBoolean()) { + searchRequest.setCcsMinimizeRoundtrips(randomBoolean()); + } if (randomBoolean()) { searchRequest.indices(generateRandomStringArray(10, 10, false, false)); } diff --git a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml index 1ebd18ccaa3ac..fa8172697287e 100644 --- a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml +++ b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml @@ -97,6 +97,9 @@ teardown: terms: field: f1.keyword + - match: {_clusters.total: 2} + - match: {_clusters.successful: 2} + - match: {_clusters.skipped: 0} - match: { _shards.total: 5 } - match: { hits.total: 6} - match: { hits.hits.0._index: "my_remote_cluster:test_index"} @@ -115,6 +118,9 @@ teardown: terms: field: f1.keyword + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} - match: { _shards.total: 3 } - match: { hits.total: 6} - match: { hits.hits.0._index: "my_remote_cluster:test_index"} @@ -134,6 +140,9 @@ teardown: terms: field: f1.keyword + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} - match: { _shards.total: 3 } - match: { hits.total: 6} - match: { hits.hits.0._index: "my_remote_cluster:test_index"} @@ -152,6 +161,7 @@ teardown: terms: field: f1.keyword + - is_false: _clusters - match: { _shards.total: 2 } - match: { hits.total: 5} - match: { hits.hits.0._index: "local_index"} @@ -182,6 +192,9 @@ teardown: rest_total_hits_as_int: true index: test_remote_cluster:test_index + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} - match: { _shards.total: 3 } - match: { hits.total: 6 } - match: { hits.hits.0._index: "test_remote_cluster:test_index" } @@ -193,6 +206,9 @@ teardown: rest_total_hits_as_int: true index: "*_remote_cluster:test_ind*" + - match: {_clusters.total: 2} + - match: {_clusters.successful: 2} + - match: {_clusters.skipped: 0} - match: { _shards.total: 6 } - match: { hits.total: 12 } @@ -205,6 +221,9 @@ teardown: rest_total_hits_as_int: true index: my_remote_cluster:aliased_test_index + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} - match: { _shards.total: 3 } - match: { hits.total: 2 } - match: { hits.hits.0._source.filter_field: 1 } @@ -219,6 +238,9 @@ teardown: rest_total_hits_as_int: true index: my_remote_cluster:secure_alias # TODO make this a wildcard once + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} - match: { _shards.total: 2 } - match: { hits.total: 1 } - is_true: hits.hits.0._source.secure diff --git a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/40_scroll.yml b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/40_scroll.yml index 6875df0847d1a..0026df4978075 100644 --- a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/40_scroll.yml +++ b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/40_scroll.yml @@ -52,6 +52,9 @@ teardown: query: match_all: {} + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} - set: {_scroll_id: scroll_id} - match: {hits.total: 6 } - length: {hits.hits: 4 } @@ -66,6 +69,7 @@ teardown: rest_total_hits_as_int: true body: { "scroll_id": "$scroll_id", "scroll": "1m"} + - is_false: _clusters - match: {hits.total: 6 } - length: {hits.hits: 2 } - match: {hits.hits.0._source.filter_field: 1 } @@ -100,6 +104,9 @@ teardown: query: match_all: {} + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} - set: {_scroll_id: scroll_id} - match: {hits.total: 6 } - length: {hits.hits: 4 } diff --git a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/60_skip_shards.yml b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/60_skip_shards.yml index e7842db70d263..d74e82edca7f0 100644 --- a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/60_skip_shards.yml +++ b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/60_skip_shards.yml @@ -66,6 +66,7 @@ teardown: - do: headers: { Authorization: "Basic am9lOnMza3JpdA==" } search: + ccs_minimize_roundtrips: false rest_total_hits_as_int: true index: "skip_shards_index,my_remote_cluster:single_doc_index" pre_filter_shard_size: 1 @@ -83,6 +84,7 @@ teardown: - do: headers: { Authorization: "Basic am9lOnMza3JpdA==" } search: + ccs_minimize_roundtrips: false rest_total_hits_as_int: true index: "skip_shards_index,my_remote_cluster:single_doc_index" pre_filter_shard_size: 1