diff --git a/modules/percolator/src/test/java/org/elasticsearch/percolator/PercolatorQuerySearchIT.java b/modules/percolator/src/test/java/org/elasticsearch/percolator/PercolatorQuerySearchIT.java index f7701bfa96be4..453b1c888e6e2 100644 --- a/modules/percolator/src/test/java/org/elasticsearch/percolator/PercolatorQuerySearchIT.java +++ b/modules/percolator/src/test/java/org/elasticsearch/percolator/PercolatorQuerySearchIT.java @@ -36,13 +36,17 @@ import org.elasticsearch.index.query.MultiMatchQueryBuilder; import org.elasticsearch.index.query.Operator; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.test.ESIntegTestCase; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; +import java.util.List; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.smileBuilder; @@ -71,6 +75,14 @@ public class PercolatorQuerySearchIT extends ESIntegTestCase { + @Override + protected Collection> nodePlugins() { + List> plugins = new ArrayList<>(); + plugins.addAll(super.nodePlugins()); + plugins.add(PercolatorPlugin.class); + return plugins; + } + public void testPercolatorQuery() throws Exception { assertAcked(client().admin().indices().prepareCreate("test") .setMapping("id", "type=keyword", "field1", "type=keyword", "field2", "type=keyword", "query", "type=percolator") diff --git a/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java b/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java index b4d52fa418e10..530675dbae7db 100644 --- a/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java @@ -19,7 +19,6 @@ package org.elasticsearch.action.search; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -43,13 +42,15 @@ final class DfsQueryPhase extends SearchPhase { private final ArraySearchPhaseResults queryResult; private final SearchPhaseController searchPhaseController; - private final AtomicArray dfsSearchResults; + private final List searchResults; + private final AggregatedDfs dfs; private final Function, SearchPhase> nextPhaseFactory; private final SearchPhaseContext context; private final SearchTransportService searchTransportService; private final SearchProgressListener progressListener; - DfsQueryPhase(AtomicArray dfsSearchResults, + DfsQueryPhase(List searchResults, + AggregatedDfs dfs, SearchPhaseController searchPhaseController, Function, SearchPhase> nextPhaseFactory, SearchPhaseContext context) { @@ -57,7 +58,8 @@ final class DfsQueryPhase extends SearchPhase { this.progressListener = context.getTask().getProgressListener(); this.queryResult = searchPhaseController.newSearchPhaseResults(progressListener, context.getRequest(), context.getNumShards()); this.searchPhaseController = searchPhaseController; - this.dfsSearchResults = dfsSearchResults; + this.searchResults = searchResults; + this.dfs = dfs; this.nextPhaseFactory = nextPhaseFactory; this.context = context; this.searchTransportService = context.getSearchTransport(); @@ -67,18 +69,16 @@ final class DfsQueryPhase extends SearchPhase { public void run() throws IOException { // TODO we can potentially also consume the actual per shard results from the initial phase here in the aggregateDfs // to free up memory early - final List resultList = dfsSearchResults.asList(); - final AggregatedDfs dfs = searchPhaseController.aggregateDfs(resultList); final CountedCollector counter = new CountedCollector<>(queryResult::consumeResult, - resultList.size(), + searchResults.size(), () -> context.executeNextPhase(this, nextPhaseFactory.apply(queryResult)), context); final SearchSourceBuilder sourceBuilder = context.getRequest().source(); - progressListener.notifyListShards(progressListener.searchShards(resultList), sourceBuilder == null || sourceBuilder.size() != 0); - for (final DfsSearchResult dfsResult : resultList) { + progressListener.notifyListShards(progressListener.searchShards(searchResults), sourceBuilder == null || sourceBuilder.size() != 0); + for (final DfsSearchResult dfsResult : searchResults) { final SearchShardTarget searchShardTarget = dfsResult.getSearchShardTarget(); Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId()); QuerySearchRequest querySearchRequest = new QuerySearchRequest(searchShardTarget.getOriginalIndices(), - dfsResult.getRequestId(), dfs); + dfsResult.getRequestId(), dfsResult.getShardSearchRequest(), dfs); final int shardIndex = dfsResult.getShardIndex(); searchTransportService.sendExecuteQuery(connection, querySearchRequest, context.getTask(), new SearchActionListener(searchShardTarget, shardIndex) { diff --git a/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java index 41d216072e4b2..980c244d70940 100644 --- a/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java @@ -25,11 +25,14 @@ import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.search.RescoreDocIds; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.dfs.AggregatedDfs; import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.fetch.ShardFetchSearchRequest; import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.transport.Transport; @@ -50,16 +53,19 @@ final class FetchSearchPhase extends SearchPhase { private final Logger logger; private final SearchPhaseResults resultConsumer; private final SearchProgressListener progressListener; + private final AggregatedDfs aggregatedDfs; FetchSearchPhase(SearchPhaseResults resultConsumer, SearchPhaseController searchPhaseController, + AggregatedDfs aggregatedDfs, SearchPhaseContext context) { - this(resultConsumer, searchPhaseController, context, + this(resultConsumer, searchPhaseController, aggregatedDfs, context, (response, scrollId) -> new ExpandSearchPhase(context, response, scrollId)); } FetchSearchPhase(SearchPhaseResults resultConsumer, SearchPhaseController searchPhaseController, + AggregatedDfs aggregatedDfs, SearchPhaseContext context, BiFunction nextPhaseFactory) { super("fetch"); if (context.getNumShards() != resultConsumer.getNumShards()) { @@ -69,6 +75,7 @@ final class FetchSearchPhase extends SearchPhase { this.fetchResults = new AtomicArray<>(resultConsumer.getNumShards()); this.searchPhaseController = searchPhaseController; this.queryResults = resultConsumer.getAtomicArray(); + this.aggregatedDfs = aggregatedDfs; this.nextPhaseFactory = nextPhaseFactory; this.context = context; this.logger = context.getLogger(); @@ -144,7 +151,8 @@ private void innerRun() throws IOException { Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId()); ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().getRequestId(), i, entry, - lastEmittedDocPerShard, searchShardTarget.getOriginalIndices()); + lastEmittedDocPerShard, searchShardTarget.getOriginalIndices(), queryResult.getShardSearchRequest(), + queryResult.getRescoreDocIds()); executeFetch(i, searchShardTarget, counter, fetchSearchRequest, queryResult.queryResult(), connection); } @@ -153,10 +161,12 @@ private void innerRun() throws IOException { } } - protected ShardFetchSearchRequest createFetchRequest(long queryId, int index, IntArrayList entry, - ScoreDoc[] lastEmittedDocPerShard, OriginalIndices originalIndices) { + protected ShardFetchSearchRequest createFetchRequest(long queryId, int index, IntArrayList entry, ScoreDoc[] lastEmittedDocPerShard, + OriginalIndices originalIndices, ShardSearchRequest shardSearchRequest, + RescoreDocIds rescoreDocIds) { final ScoreDoc lastEmittedDoc = (lastEmittedDocPerShard != null) ? lastEmittedDocPerShard[index] : null; - return new ShardFetchSearchRequest(originalIndices, queryId, entry, lastEmittedDoc); + return new ShardFetchSearchRequest(originalIndices, queryId, shardSearchRequest, entry, lastEmittedDoc, + rescoreDocIds, aggregatedDfs); } private void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java index 0782fbb310b65..25d604758bd9f 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java @@ -23,10 +23,12 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.search.dfs.AggregatedDfs; import org.elasticsearch.search.dfs.DfsSearchResult; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.transport.Transport; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; @@ -59,7 +61,10 @@ protected void executePhaseOnShard(final SearchShardIterator shardIt, final Shar @Override protected SearchPhase getNextPhase(final SearchPhaseResults results, final SearchPhaseContext context) { - return new DfsQueryPhase(results.getAtomicArray(), searchPhaseController, (queryResults) -> - new FetchSearchPhase(queryResults, searchPhaseController, context), context); + final List dfsSearchResults = results.getAtomicArray().asList(); + final AggregatedDfs aggregatedDfs = searchPhaseController.aggregateDfs(dfsSearchResults); + + return new DfsQueryPhase(dfsSearchResults, aggregatedDfs, searchPhaseController, (queryResults) -> + new FetchSearchPhase(queryResults, searchPhaseController, aggregatedDfs, context), context); } } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index d5060b728347d..f5a9d23928ff4 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -70,6 +70,6 @@ protected void onShardGroupFailure(int shardIndex, Exception exc) { @Override protected SearchPhase getNextPhase(final SearchPhaseResults results, final SearchPhaseContext context) { - return new FetchSearchPhase(results, searchPhaseController, context); + return new FetchSearchPhase(results, searchPhaseController, null, context); } } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 53d8d266ea795..3b90b7c38766d 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.search; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.IndicesRequest; @@ -282,6 +283,10 @@ public void writeTo(StreamOutput out) throws IOException { } } + static boolean keepStatesInContext(Version version) { + return version.before(Version.V_8_0_0); + } + public static void registerRequestHandler(TransportService transportService, SearchService searchService) { transportService.registerRequestHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, ScrollFreeContextRequest::new, (request, channel, task) -> { @@ -306,7 +311,7 @@ public static void registerRequestHandler(TransportService transportService, Sea transportService.registerRequestHandler(DFS_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchRequest::new, (request, channel, task) -> - searchService.executeDfsPhase(request, (SearchShardTask) task, + searchService.executeDfsPhase(request, keepStatesInContext(channel.getVersion()), (SearchShardTask) task, new ChannelActionListener<>(channel, DFS_ACTION_NAME, request)) ); @@ -314,7 +319,7 @@ public static void registerRequestHandler(TransportService transportService, Sea transportService.registerRequestHandler(QUERY_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchRequest::new, (request, channel, task) -> { - searchService.executeQueryPhase(request, (SearchShardTask) task, + searchService.executeQueryPhase(request, keepStatesInContext(channel.getVersion()), (SearchShardTask) task, new ChannelActionListener<>(channel, QUERY_ACTION_NAME, request)); }); TransportActionProxy.registerProxyActionWithDynamicResponseType(transportService, QUERY_ACTION_NAME, diff --git a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java index 6571d60b0d3ed..9ecffb1571cc9 100644 --- a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java @@ -138,6 +138,7 @@ final class DefaultSearchContext extends SearchContext { private SearchContextAggregations aggregations; private SearchContextHighlight highlight; private SuggestionSearchContext suggest; + private List rescore; private Profilers profilers; private final Map searchExtBuilders = new HashMap<>(); @@ -155,8 +156,8 @@ final class DefaultSearchContext extends SearchContext { this.shardTarget = shardTarget; // SearchContexts use a BigArrays that can circuit break this.bigArrays = bigArrays.withCircuitBreaking(); - this.dfsResult = new DfsSearchResult(readerContext.id(), shardTarget); - this.queryResult = new QuerySearchResult(readerContext.id(), shardTarget); + this.dfsResult = new DfsSearchResult(readerContext.id(), shardTarget, request); + this.queryResult = new QuerySearchResult(readerContext.id(), shardTarget, request); this.fetchResult = new FetchSearchResult(readerContext.id(), shardTarget); this.indexShard = indexShard; this.indexService = indexService; @@ -164,7 +165,6 @@ final class DefaultSearchContext extends SearchContext { final Engine.Searcher engineSearcher = readerContext.engineSearcher(); this.searcher = new ContextIndexSearcher(engineSearcher.getIndexReader(), engineSearcher.getSimilarity(), engineSearcher.getQueryCache(), engineSearcher.getQueryCachingPolicy()); - searcher.setAggregatedDfs(readerContext.aggregatedDfs()); this.relativeTimeSupplier = relativeTimeSupplier; this.timeout = timeout; queryShardContext = indexService.newQueryShardContext(request.shardId().id(), this.searcher, @@ -203,7 +203,7 @@ public void preProcess(boolean rewrite) { + "]. Scroll batch sizes cost as much memory as result windows so they are controlled by the [" + IndexSettings.MAX_RESULT_WINDOW_SETTING.getKey() + "] index level setting."); } - if (rescore().isEmpty() == false) { + if (rescore != null) { if (sort != null) { throw new IllegalArgumentException("Cannot use [sort] option in conjunction with [rescore]."); } @@ -327,7 +327,6 @@ public ScrollContext scrollContext() { return readerContext.scrollContext(); } - @Override public SearchContextAggregations aggregations() { return aggregations; @@ -373,7 +372,18 @@ public void suggest(SuggestionSearchContext suggest) { @Override public List rescore() { - return readerContext.rescore(); + if (rescore == null) { + return List.of(); + } + return rescore; + } + + @Override + public void addRescore(RescoreContext rescore) { + if (this.rescore == null) { + this.rescore = new ArrayList<>(); + } + this.rescore.add(rescore); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/RescoreDocIds.java b/server/src/main/java/org/elasticsearch/search/RescoreDocIds.java new file mode 100644 index 0000000000000..7d908f53518b9 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/RescoreDocIds.java @@ -0,0 +1,51 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; + +public final class RescoreDocIds implements Writeable { + public static final RescoreDocIds EMPTY = new RescoreDocIds(Map.of()); + + private final Map> docIds; + + public RescoreDocIds(Map> docIds) { + this.docIds = docIds; + } + + public RescoreDocIds(StreamInput in) throws IOException { + docIds = in.readMap(StreamInput::readVInt, i -> i.readSet(StreamInput::readVInt)); + } + + public Set getId(int index) { + return docIds.get(index); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeMap(docIds, StreamOutput::writeVInt, (o, v) -> o.writeCollection(v, StreamOutput::writeVInt)); + } +} diff --git a/server/src/main/java/org/elasticsearch/search/SearchPhaseResult.java b/server/src/main/java/org/elasticsearch/search/SearchPhaseResult.java index 77af879831e02..b524fd331e1d4 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchPhaseResult.java +++ b/server/src/main/java/org/elasticsearch/search/SearchPhaseResult.java @@ -19,9 +19,11 @@ package org.elasticsearch.search; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.search.fetch.FetchSearchResult; +import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.transport.TransportResponse; @@ -40,6 +42,8 @@ public abstract class SearchPhaseResult extends TransportResponse { private SearchShardTarget searchShardTarget; private int shardIndex = -1; protected long requestId; + private ShardSearchRequest shardSearchRequest; + private RescoreDocIds rescoreDocIds = RescoreDocIds.EMPTY; protected SearchPhaseResult() { @@ -90,6 +94,23 @@ public QuerySearchResult queryResult() { */ public FetchSearchResult fetchResult() { return null; } + @Nullable + public ShardSearchRequest getShardSearchRequest() { + return shardSearchRequest; + } + + public void setShardSearchRequest(ShardSearchRequest shardSearchRequest) { + this.shardSearchRequest = shardSearchRequest; + } + + public RescoreDocIds getRescoreDocIds() { + return rescoreDocIds; + } + + public void setRescoreDocIds(RescoreDocIds rescoreDocIds) { + this.rescoreDocIds = rescoreDocIds; + } + @Override public void writeTo(StreamOutput out) throws IOException { // TODO: this seems wrong, SearchPhaseResult should have a writeTo? diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index e856e8a12a374..ebddbba761e79 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -85,6 +85,7 @@ import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.InternalScrollSearchRequest; +import org.elasticsearch.search.internal.LegacyReaderContext; import org.elasticsearch.search.internal.ReaderContext; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.ShardSearchRequest; @@ -308,7 +309,8 @@ protected void doClose() { keepAliveReaper.cancel(); } - public void executeDfsPhase(ShardSearchRequest request, SearchShardTask task, ActionListener listener) { + public void executeDfsPhase(ShardSearchRequest request, boolean keepStatesInContext, + SearchShardTask task, ActionListener listener) { rewriteShardRequest(request, ActionListener.wrap( // fork the execution in the search thread pool and wraps the searcher // to execute the query @@ -317,7 +319,7 @@ public void executeDfsPhase(ShardSearchRequest request, SearchShardTask task, Ac context.wrapSearcher().execute(() -> { final SearchPhaseResult result; try { - result = executeDfsPhase(context, task); + result = executeDfsPhase(context, keepStatesInContext, task); } catch (Exception exc) { listener.onFailure(exc); return; @@ -332,9 +334,9 @@ public void executeDfsPhase(ShardSearchRequest request, SearchShardTask task, Ac }, listener::onFailure)); } - private DfsSearchResult executeDfsPhase(SearchRewriteContext rewriteContext, SearchShardTask task) { - final ReaderContext reader = createAndPutReaderContext(rewriteContext); - try (SearchContext context = createContext(reader, reader.request(), task, true)) { + private DfsSearchResult executeDfsPhase(SearchRewriteContext rewriteContext, boolean keepStatesInContext, SearchShardTask task) { + final ReaderContext reader = createAndPutReaderContext(rewriteContext, keepStatesInContext); + try (SearchContext context = createContext(reader, rewriteContext.request, task, true)) { dfsPhase.execute(context); return context.dfsResult(); } catch (Exception e) { @@ -357,7 +359,8 @@ private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final Sea } } - public void executeQueryPhase(ShardSearchRequest request, SearchShardTask task, ActionListener listener) { + public void executeQueryPhase(ShardSearchRequest request, boolean keepStatesInContext, + SearchShardTask task, ActionListener listener) { assert request.canReturnNullResponseIfMatchNoDocs() == false || request.numberOfShards() > 1 : "empty responses require more than one shard"; rewriteShardRequest(request, ActionListener.wrap( @@ -375,7 +378,7 @@ && canRewriteToMatchNone(rewritten.source()) context.wrapSearcher().execute(() -> { final SearchPhaseResult result; try { - result = executeQueryPhase(context, task); + result = executeQueryPhase(context, keepStatesInContext, task); } catch (Exception exc) { listener.onFailure(exc); return; @@ -405,10 +408,11 @@ private void runAsync(long id, Supplier executable, ActionListener lis getExecutor(id).execute(ActionRunnable.supply(listener, executable::get)); } - private SearchPhaseResult executeQueryPhase(SearchRewriteContext rewriteContext, SearchShardTask task) throws Exception { - final ReaderContext reader = createAndPutReaderContext(rewriteContext); + private SearchPhaseResult executeQueryPhase(SearchRewriteContext rewriteContext, boolean keepStatesInContext, + SearchShardTask task) throws Exception { + final ReaderContext reader = createAndPutReaderContext(rewriteContext, keepStatesInContext); final ShardSearchRequest request = rewriteContext.request; - try (SearchContext context = createContext(reader, reader.request(), task, true)) { + try (SearchContext context = createContext(reader, request, task, true)) { final long afterQueryTime; try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { loadOrExecuteQueryPhase(request, context); @@ -449,13 +453,17 @@ public void executeQueryPhase(InternalScrollSearchRequest request, SearchShardTask task, ActionListener listener) { runAsync(request.id(), () -> { - final ReaderContext reader = findReaderContext(request.id()); - try (SearchContext context = createContext(reader, reader.request(), task, false); + final LegacyReaderContext reader = (LegacyReaderContext) findReaderContext(request.id()); + try (SearchContext context = createContext(reader, reader.getShardSearchRequest(null), task, false); SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { reader.indexShard().getSearchOperationListener().validateSearchContext(context, request); + context.searcher().setAggregatedDfs(reader.getAggregatedDfs(null)); processScroll(request, reader, context); queryPhase.execute(context); executor.success(); + final RescoreDocIds rescoreDocIds = context.rescoreDocIds(); + context.queryResult().setRescoreDocIds(rescoreDocIds); + reader.setRescoreDocIds(rescoreDocIds); return new ScrollQuerySearchResult(context.queryResult(), context.shardTarget()); } catch (Exception e) { logger.trace("Query phase failed", e); @@ -468,8 +476,8 @@ public void executeQueryPhase(InternalScrollSearchRequest request, public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, ActionListener listener) { runAsync(request.id(), () -> { final ReaderContext reader = findReaderContext(request.id()); - reader.aggregatedDfs(request.dfs()); - try (SearchContext context = createContext(reader, reader.request(), task, true); + reader.setAggregatedDfs(request.dfs()); + try (SearchContext context = createContext(reader, reader.getShardSearchRequest(request.shardSearchRequest()), task, true); SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { reader.indexShard().getSearchOperationListener().validateSearchContext(context, request); context.searcher().setAggregatedDfs(request.dfs()); @@ -479,6 +487,9 @@ public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, freeReaderContext(reader.id()); } executor.success(); + final RescoreDocIds rescoreDocIds = context.rescoreDocIds(); + context.queryResult().setRescoreDocIds(rescoreDocIds); + reader.setRescoreDocIds(rescoreDocIds); return context.queryResult(); } catch (Exception e) { logger.trace("Query phase failed", e); @@ -510,9 +521,11 @@ private Executor getExecutor(IndexShard indexShard) { public void executeFetchPhase(InternalScrollSearchRequest request, SearchShardTask task, ActionListener listener) { runAsync(request.id(), () -> { - final ReaderContext reader = findReaderContext(request.id()); - try (SearchContext context = createContext(reader, reader.request(), task, false); + final LegacyReaderContext reader = (LegacyReaderContext) findReaderContext(request.id()); + try (SearchContext context = createContext(reader, reader.getShardSearchRequest(null), task, false); SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { + context.assignRescoreDocIds(reader.getRescoreDocIds(null)); + context.searcher().setAggregatedDfs(reader.getAggregatedDfs(null)); reader.indexShard().getSearchOperationListener().validateSearchContext(context, request); processScroll(request, reader, context); queryPhase.execute(context); @@ -530,11 +543,14 @@ public void executeFetchPhase(InternalScrollSearchRequest request, SearchShardTa public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, ActionListener listener) { runAsync(request.id(), () -> { final ReaderContext reader = findReaderContext(request.id()); - try (SearchContext context = createContext(reader, reader.request(), task, false)) { + final ShardSearchRequest shardSearchRequest = reader.getShardSearchRequest(request.getShardSearchRequest()); + try (SearchContext context = createContext(reader, shardSearchRequest, task, false)) { reader.indexShard().getSearchOperationListener().validateSearchContext(context, request); if (request.lastEmittedDoc() != null) { context.scrollContext().lastEmittedDoc = request.lastEmittedDoc(); } + context.assignRescoreDocIds(reader.getRescoreDocIds(request.getRescoreDocIds())); + context.searcher().setAggregatedDfs(reader.getAggregatedDfs(request.getAggregatedDfs())); context.docIdsToLoad(request.docIds(), 0, request.docIdsSize()); try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, System.nanoTime())) { fetchPhase.execute(context); @@ -560,7 +576,7 @@ private ReaderContext findReaderContext(long id) throws SearchContextMissingExce return reader; } - final ReaderContext createAndPutReaderContext(SearchRewriteContext rewriteContext) { + final ReaderContext createAndPutReaderContext(SearchRewriteContext rewriteContext, boolean keepStatesInContext) { final ShardSearchRequest request = rewriteContext.request; final SearchOperationListener searchOperationListener = rewriteContext.shard.getSearchOperationListener(); Engine.Searcher engineSearcher = rewriteContext.searcher; @@ -572,7 +588,11 @@ final ReaderContext createAndPutReaderContext(SearchRewriteContext rewriteContex maxOpenScrollContext + "]. " + "This limit can be set by changing the [" + MAX_OPEN_SCROLL_CONTEXT.getKey() + "] setting."); } - readerContext = new ReaderContext(idGenerator.incrementAndGet(), rewriteContext.shard, engineSearcher, request); + if (keepStatesInContext || request.scroll() != null) { + readerContext = new LegacyReaderContext(idGenerator.incrementAndGet(), rewriteContext.shard, engineSearcher, request); + } else { + readerContext = new ReaderContext(idGenerator.incrementAndGet(), rewriteContext.shard, engineSearcher); + } engineSearcher = null; final ReaderContext finalReaderContext = readerContext; searchOperationListener.onNewReaderContext(finalReaderContext); @@ -605,7 +625,7 @@ final SearchContext createContext(ReaderContext reader, ShardSearchRequest reque if (request.scroll() != null) { context.scrollContext().scroll = request.scroll(); } - parseSource(reader, context, request.source(), includeAggregations); + parseSource(context, request.source(), includeAggregations); // if the from and size are still not set, default them if (context.from() == -1) { @@ -642,7 +662,7 @@ final SearchContext createContext(ReaderContext reader, ShardSearchRequest reque public DefaultSearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout) { IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().getId()); Engine.Searcher engineSearcher = indexShard.acquireSearcher("search"); - try (ReaderContext readerContext = new ReaderContext(idGenerator.incrementAndGet(), indexShard, engineSearcher, request)) { + try (ReaderContext readerContext = new ReaderContext(idGenerator.incrementAndGet(), indexShard, engineSearcher)) { engineSearcher = null; // transfer ownership to readerContext return createSearchContext(readerContext, request, timeout); } finally { @@ -724,8 +744,7 @@ private void processFailure(ReaderContext context, Exception e) { } } - private void parseSource(ReaderContext reader, DefaultSearchContext context, - SearchSourceBuilder source, boolean includeAggregations) throws SearchException { + private void parseSource(DefaultSearchContext context, SearchSourceBuilder source, boolean includeAggregations) { // nothing to parse... if (source == null) { return; @@ -796,10 +815,10 @@ private void parseSource(ReaderContext reader, DefaultSearchContext context, throw new SearchException(shardTarget, "failed to create SuggestionSearchContext", e); } } - if (source.rescores() != null && reader.rescore().isEmpty()) { + if (source.rescores() != null) { try { for (RescorerBuilder rescore : source.rescores()) { - reader.addRescore(rescore.buildContext(queryShardContext)); + context.addRescore(rescore.buildContext(queryShardContext)); } } catch (IOException e) { throw new SearchException(shardTarget, "failed to create RescoreSearchContext", e); diff --git a/server/src/main/java/org/elasticsearch/search/dfs/DfsSearchResult.java b/server/src/main/java/org/elasticsearch/search/dfs/DfsSearchResult.java index 4d245e2bcb002..85d7261b2615b 100644 --- a/server/src/main/java/org/elasticsearch/search/dfs/DfsSearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/dfs/DfsSearchResult.java @@ -25,11 +25,13 @@ import org.apache.lucene.search.CollectionStatistics; import org.apache.lucene.search.TermStatistics; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.Version; import org.elasticsearch.common.collect.HppcMaps; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.internal.ShardSearchRequest; import java.io.IOException; @@ -58,11 +60,15 @@ public DfsSearchResult(StreamInput in) throws IOException { fieldStatistics = readFieldStats(in); maxDoc = in.readVInt(); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + setShardSearchRequest(in.readOptionalWriteable(ShardSearchRequest::new)); + } } - public DfsSearchResult(long id, SearchShardTarget shardTarget) { + public DfsSearchResult(long id, SearchShardTarget shardTarget, ShardSearchRequest shardSearchRequest) { this.setSearchShardTarget(shardTarget); this.requestId = id; + setShardSearchRequest(shardSearchRequest); } public DfsSearchResult maxDoc(int maxDoc) { @@ -97,7 +103,7 @@ public ObjectObjectHashMap fieldStatistics() { return fieldStatistics; } - @Override + @Override public void writeTo(StreamOutput out) throws IOException { out.writeLong(requestId); out.writeVInt(terms.length); @@ -108,6 +114,9 @@ public void writeTo(StreamOutput out) throws IOException { writeTermStats(out, termStatistics); writeFieldStats(out, fieldStatistics); out.writeVInt(maxDoc); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeOptionalWriteable(getShardSearchRequest()); + } } public static void writeFieldStats(StreamOutput out, ObjectObjectHashMap rescore; - private final List onCloses = new CopyOnWriteArrayList<>(); - public ReaderContext(long id, IndexShard indexShard, Engine.Searcher engineSearcher, ShardSearchRequest request) { + public ReaderContext(long id, IndexShard indexShard, Engine.Searcher engineSearcher) { super("reader_context"); this.id = id; this.indexShard = indexShard; this.engineSearcher = engineSearcher; - this.request = request; - if (request.scroll() != null) { - this.scrollContext = new ScrollContext(); - } else { - this.scrollContext = null; - } } @Override @@ -106,22 +93,6 @@ public String source() { return engineSearcher.source(); } - public ShardSearchRequest request() { - return request; - } - - public ScrollContext scrollContext() { - return scrollContext; - } - - public AggregatedDfs aggregatedDfs() { - return aggregatedDfs; - } - - public void aggregatedDfs(AggregatedDfs aggregatedDfs) { - this.aggregatedDfs = aggregatedDfs; - } - public void accessed(long accessTime) { this.lastAccessTime = accessTime; } @@ -138,18 +109,28 @@ public void keepAlive(long keepAlive) { this.keepAlive = keepAlive; } - public List rescore() { - if (rescore == null) { - return Collections.emptyList(); - } else { - return Collections.unmodifiableList(rescore); - } + // BWC + public ShardSearchRequest getShardSearchRequest(ShardSearchRequest other) { + return Objects.requireNonNull(other); } - public void addRescore(RescoreContext rescore) { - if (this.rescore == null) { - this.rescore = new ArrayList<>(); - } - this.rescore.add(rescore); + public ScrollContext scrollContext() { + return null; + } + + public AggregatedDfs getAggregatedDfs(AggregatedDfs other) { + return other; + } + + public void setAggregatedDfs(AggregatedDfs aggregatedDfs) { + + } + + public RescoreDocIds getRescoreDocIds(RescoreDocIds other) { + return Objects.requireNonNull(other); + } + + public void setRescoreDocIds(RescoreDocIds rescoreDocIds) { + } } diff --git a/server/src/main/java/org/elasticsearch/search/internal/SearchContext.java b/server/src/main/java/org/elasticsearch/search/internal/SearchContext.java index 0a1fc3822ad42..378e34ca87a13 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/SearchContext.java @@ -38,6 +38,7 @@ import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.similarity.SimilarityService; +import org.elasticsearch.search.RescoreDocIds; import org.elasticsearch.search.SearchExtBuilder; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.aggregations.SearchContextAggregations; @@ -58,8 +59,10 @@ import org.elasticsearch.search.sort.SortAndFormats; import org.elasticsearch.search.suggest.SuggestionSearchContext; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; @@ -156,6 +159,38 @@ public InnerHitsContext innerHits() { */ public abstract List rescore(); + public abstract void addRescore(RescoreContext rescore); + + public final RescoreDocIds rescoreDocIds() { + final List rescore = rescore(); + if (rescore == null) { + return RescoreDocIds.EMPTY; + } + Map> rescoreDocIds = null; + for (int i = 0; i < rescore.size(); i++) { + final Set docIds = rescore.get(i).getRescoredDocs(); + if (docIds != null && docIds.isEmpty() == false) { + if (rescoreDocIds == null) { + rescoreDocIds = new HashMap<>(); + } + rescoreDocIds.put(i, docIds); + } + } + return rescoreDocIds == null ? RescoreDocIds.EMPTY : new RescoreDocIds(rescoreDocIds); + } + + public final void assignRescoreDocIds(RescoreDocIds rescoreDocIds) { + final List rescore = rescore(); + if (rescore != null) { + for (int i = 0; i < rescore.size(); i++) { + final Set docIds = rescoreDocIds.getId(i); + if (docIds != null) { + rescore.get(i).setRescoredDocs(docIds); + } + } + } + } + public abstract boolean hasScriptFields(); public abstract ScriptFieldsContext scriptFields(); diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchRequest.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchRequest.java index d3919ec3aba48..55ab6088dd822 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchRequest.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchRequest.java @@ -19,14 +19,17 @@ package org.elasticsearch.search.query; +import org.elasticsearch.Version; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.search.SearchShardTask; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.search.dfs.AggregatedDfs; +import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.transport.TransportRequest; @@ -36,18 +39,15 @@ public class QuerySearchRequest extends TransportRequest implements IndicesRequest { - private long id; + private final long id; + private final AggregatedDfs dfs; + private final OriginalIndices originalIndices; + private final ShardSearchRequest shardSearchRequest; - private AggregatedDfs dfs; - - private OriginalIndices originalIndices; - - public QuerySearchRequest() { - } - - public QuerySearchRequest(OriginalIndices originalIndices, long id, AggregatedDfs dfs) { + public QuerySearchRequest(OriginalIndices originalIndices, long id, ShardSearchRequest shardSearchRequest, AggregatedDfs dfs) { this.id = id; this.dfs = dfs; + this.shardSearchRequest = shardSearchRequest; this.originalIndices = originalIndices; } @@ -56,6 +56,11 @@ public QuerySearchRequest(StreamInput in) throws IOException { id = in.readLong(); dfs = new AggregatedDfs(in); originalIndices = OriginalIndices.readOriginalIndices(in); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + this.shardSearchRequest = in.readOptionalWriteable(ShardSearchRequest::new); + } else { + this.shardSearchRequest = null; + } } @Override @@ -64,6 +69,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(id); dfs.writeTo(out); OriginalIndices.writeOriginalIndices(originalIndices, out); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeOptionalWriteable(shardSearchRequest); + } } public long id() { @@ -74,6 +82,11 @@ public AggregatedDfs dfs() { return dfs; } + @Nullable + public ShardSearchRequest shardSearchRequest() { + return shardSearchRequest; + } + @Override public String[] indices() { return originalIndices.indices(); diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index 8c62d229fe766..87ec31e9862e0 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.RescoreDocIds; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.aggregations.Aggregations; @@ -33,6 +34,7 @@ import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator; +import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.search.profile.ProfileShardResult; import org.elasticsearch.search.suggest.Suggest; @@ -82,10 +84,11 @@ public QuerySearchResult(StreamInput in) throws IOException { } } - public QuerySearchResult(long id, SearchShardTarget shardTarget) { + public QuerySearchResult(long id, SearchShardTarget shardTarget, ShardSearchRequest shardSearchRequest) { this.requestId = id; setSearchShardTarget(shardTarget); isNull = false; + setShardSearchRequest(shardSearchRequest); } private QuerySearchResult(boolean isNull) { @@ -336,6 +339,10 @@ public void readFromWithId(long id, StreamInput in) throws IOException { hasProfileResults = profileShardResults != null; serviceTimeEWMA = in.readZLong(); nodeQueueSize = in.readInt(); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + setShardSearchRequest(in.readOptionalWriteable(ShardSearchRequest::new)); + setRescoreDocIds(new RescoreDocIds(in)); + } } @Override @@ -388,6 +395,10 @@ public void writeToNoId(StreamOutput out) throws IOException { out.writeOptionalWriteable(profileShardResults); out.writeZLong(serviceTimeEWMA); out.writeInt(nodeQueueSize); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeOptionalWriteable(getShardSearchRequest()); + getRescoreDocIds().writeTo(out); + } } public TotalHits getTotalHits() { diff --git a/server/src/main/java/org/elasticsearch/search/rescore/RescoreContext.java b/server/src/main/java/org/elasticsearch/search/rescore/RescoreContext.java index 4f44af6321791..1ec5ecd33dc1a 100644 --- a/server/src/main/java/org/elasticsearch/search/rescore/RescoreContext.java +++ b/server/src/main/java/org/elasticsearch/search/rescore/RescoreContext.java @@ -66,6 +66,10 @@ public boolean isRescored(int docId) { return rescoredDocs.contains(docId); } + public Set getRescoredDocs() { + return rescoredDocs; + } + /** * Returns queries associated with the rescorer */ diff --git a/server/src/test/java/org/elasticsearch/action/search/CountedCollectorTests.java b/server/src/test/java/org/elasticsearch/action/search/CountedCollectorTests.java index a85f63abd0967..5f826d7168e86 100644 --- a/server/src/test/java/org/elasticsearch/action/search/CountedCollectorTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/CountedCollectorTests.java @@ -59,7 +59,7 @@ public void testCollect() throws InterruptedException { case 1: state.add(1); executor.execute(() -> { - DfsSearchResult dfsSearchResult = new DfsSearchResult(shardID, null); + DfsSearchResult dfsSearchResult = new DfsSearchResult(shardID, null, null); dfsSearchResult.setShardIndex(shardID); dfsSearchResult.setSearchShardTarget(new SearchShardTarget("foo", new ShardId("bar", "baz", shardID), null, OriginalIndices.NONE)); diff --git a/server/src/test/java/org/elasticsearch/action/search/DfsQueryPhaseTests.java b/server/src/test/java/org/elasticsearch/action/search/DfsQueryPhaseTests.java index 8a8e28e15f20d..e791389413c68 100644 --- a/server/src/test/java/org/elasticsearch/action/search/DfsQueryPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/DfsQueryPhaseTests.java @@ -46,7 +46,7 @@ public class DfsQueryPhaseTests extends ESTestCase { private static DfsSearchResult newSearchResult(int shardIndex, long requestId, SearchShardTarget target) { - DfsSearchResult result = new DfsSearchResult(requestId, target); + DfsSearchResult result = new DfsSearchResult(requestId, target, null); result.setShardIndex(shardIndex); return result; } @@ -67,7 +67,7 @@ public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest SearchActionListener listener) { if (request.id() == 1) { QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new ShardId("test", "na", 0), - null, OriginalIndices.NONE)); + null, OriginalIndices.NONE), null); queryResult.topDocs(new TopDocsAndMaxScore( new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] {new ScoreDoc(42, 1.0F)}), 2.0F), new DocValueFormat[0]); @@ -75,7 +75,7 @@ public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest listener.onResponse(queryResult); } else if (request.id() == 2) { QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node2", new ShardId("test", "na", 0), - null, OriginalIndices.NONE)); + null, OriginalIndices.NONE), null); queryResult.topDocs(new TopDocsAndMaxScore( new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] {new ScoreDoc(84, 2.0F)}), 2.0F), new DocValueFormat[0]); @@ -88,7 +88,7 @@ public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest }; MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2); mockSearchPhaseContext.searchTransport = searchTransportService; - DfsQueryPhase phase = new DfsQueryPhase(results, controller, + DfsQueryPhase phase = new DfsQueryPhase(results.asList(), null, controller, (response) -> new SearchPhase("test") { @Override public void run() throws IOException { @@ -127,7 +127,7 @@ public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest SearchActionListener listener) { if (request.id() == 1) { QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new ShardId("test", "na", 0), - null, OriginalIndices.NONE)); + null, OriginalIndices.NONE), null); queryResult.topDocs(new TopDocsAndMaxScore(new TopDocs( new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] {new ScoreDoc(42, 1.0F)}), 2.0F), new DocValueFormat[0]); @@ -142,7 +142,7 @@ public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest }; MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2); mockSearchPhaseContext.searchTransport = searchTransportService; - DfsQueryPhase phase = new DfsQueryPhase(results, controller, + DfsQueryPhase phase = new DfsQueryPhase(results.asList(), null, controller, (response) -> new SearchPhase("test") { @Override public void run() throws IOException { @@ -184,7 +184,7 @@ public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest SearchActionListener listener) { if (request.id() == 1) { QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new ShardId("test", "na", 0), - null, OriginalIndices.NONE)); + null, OriginalIndices.NONE), null); queryResult.topDocs(new TopDocsAndMaxScore( new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] {new ScoreDoc(42, 1.0F)}), 2.0F), new DocValueFormat[0]); @@ -199,7 +199,7 @@ public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest }; MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2); mockSearchPhaseContext.searchTransport = searchTransportService; - DfsQueryPhase phase = new DfsQueryPhase(results, controller, + DfsQueryPhase phase = new DfsQueryPhase(results.asList(), null, controller, (response) -> new SearchPhase("test") { @Override public void run() throws IOException { diff --git a/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java b/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java index 5eec29dbf8039..8e70c95b6a0d1 100644 --- a/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java @@ -70,7 +70,7 @@ public void testShortcutQueryAndFetchOptimization() { numHits = 0; } - FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext, + FetchSearchPhase phase = new FetchSearchPhase(results, controller, null, mockSearchPhaseContext, (searchResponse, scrollId) -> new SearchPhase("test") { @Override public void run() { @@ -96,14 +96,15 @@ public void testFetchTwoDocument() { ArraySearchPhaseResults results = controller.newSearchPhaseResults(NOOP, mockSearchPhaseContext.getRequest(), 2); int resultSetSize = randomIntBetween(2, 10); QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new ShardId("test", "na", 0), - null, OriginalIndices.NONE)); + null, OriginalIndices.NONE), null); queryResult.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] {new ScoreDoc(42, 1.0F)}), 2.0F), new DocValueFormat[0]); queryResult.size(resultSetSize); // the size of the result set queryResult.setShardIndex(0); results.consumeResult(queryResult); - queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new ShardId("test", "na", 1), null, OriginalIndices.NONE)); + queryResult = new QuerySearchResult( + 321, new SearchShardTarget("node2", new ShardId("test", "na", 1), null, OriginalIndices.NONE), null); queryResult.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] {new ScoreDoc(84, 2.0F)}), 2.0F), new DocValueFormat[0]); queryResult.size(resultSetSize); @@ -126,7 +127,7 @@ public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRe listener.onResponse(fetchResult); } }; - FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext, + FetchSearchPhase phase = new FetchSearchPhase(results, controller, null, mockSearchPhaseContext, (searchResponse, scrollId) -> new SearchPhase("test") { @Override public void run() { @@ -154,14 +155,15 @@ public void testFailFetchOneDoc() { controller.newSearchPhaseResults(NOOP, mockSearchPhaseContext.getRequest(), 2); int resultSetSize = randomIntBetween(2, 10); QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new ShardId("test", "na", 0), - null, OriginalIndices.NONE)); + null, OriginalIndices.NONE), null); queryResult.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] {new ScoreDoc(42, 1.0F)}), 2.0F), new DocValueFormat[0]); queryResult.size(resultSetSize); // the size of the result set queryResult.setShardIndex(0); results.consumeResult(queryResult); - queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new ShardId("test", "na", 1), null, OriginalIndices.NONE)); + queryResult = new QuerySearchResult( + 321, new SearchShardTarget("node2", new ShardId("test", "na", 1), null, OriginalIndices.NONE), null); queryResult.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] {new ScoreDoc(84, 2.0F)}), 2.0F), new DocValueFormat[0]); queryResult.size(resultSetSize); @@ -183,7 +185,7 @@ public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRe } }; - FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext, + FetchSearchPhase phase = new FetchSearchPhase(results, controller, null, mockSearchPhaseContext, (searchResponse, scrollId) -> new SearchPhase("test") { @Override public void run() { @@ -216,7 +218,7 @@ public void testFetchDocsConcurrently() throws InterruptedException { mockSearchPhaseContext.getRequest(), numHits); for (int i = 0; i < numHits; i++) { QuerySearchResult queryResult = new QuerySearchResult(i, new SearchShardTarget("node1", new ShardId("test", "na", 0), - null, OriginalIndices.NONE)); + null, OriginalIndices.NONE), null); queryResult.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] {new ScoreDoc(i+1, i)}), i), new DocValueFormat[0]); queryResult.size(resultSetSize); // the size of the result set @@ -236,7 +238,7 @@ public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRe } }; CountDownLatch latch = new CountDownLatch(1); - FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext, + FetchSearchPhase phase = new FetchSearchPhase(results, controller, null, mockSearchPhaseContext, (searchResponse, scrollId) -> new SearchPhase("test") { @Override public void run() { @@ -273,14 +275,15 @@ public void testExceptionFailsPhase() { controller.newSearchPhaseResults(NOOP, mockSearchPhaseContext.getRequest(), 2); int resultSetSize = randomIntBetween(2, 10); QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new ShardId("test", "na", 0), - null, OriginalIndices.NONE)); + null, OriginalIndices.NONE), null); queryResult.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] {new ScoreDoc(42, 1.0F)}), 2.0F), new DocValueFormat[0]); queryResult.size(resultSetSize); // the size of the result set queryResult.setShardIndex(0); results.consumeResult(queryResult); - queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new ShardId("test", "na", 1), null, OriginalIndices.NONE)); + queryResult = new QuerySearchResult( + 321, new SearchShardTarget("node2", new ShardId("test", "na", 1), null, OriginalIndices.NONE), null); queryResult.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] {new ScoreDoc(84, 2.0F)}), 2.0F), new DocValueFormat[0]); queryResult.size(resultSetSize); @@ -306,7 +309,7 @@ public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRe listener.onResponse(fetchResult); } }; - FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext, + FetchSearchPhase phase = new FetchSearchPhase(results, controller, null, mockSearchPhaseContext, (searchResponse, scrollId) -> new SearchPhase("test") { @Override public void run() { @@ -329,14 +332,15 @@ public void testCleanupIrrelevantContexts() { // contexts that are not fetched s controller.newSearchPhaseResults(NOOP, mockSearchPhaseContext.getRequest(), 2); int resultSetSize = 1; QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new ShardId("test", "na", 0), - null, OriginalIndices.NONE)); + null, OriginalIndices.NONE), null); queryResult.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] {new ScoreDoc(42, 1.0F)}), 2.0F), new DocValueFormat[0]); queryResult.size(resultSetSize); // the size of the result set queryResult.setShardIndex(0); results.consumeResult(queryResult); - queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new ShardId("test", "na", 1), null, OriginalIndices.NONE)); + queryResult = new QuerySearchResult( + 321, new SearchShardTarget("node2", new ShardId("test", "na", 1), null, OriginalIndices.NONE), null); queryResult.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] {new ScoreDoc(84, 2.0F)}), 2.0F), new DocValueFormat[0]); queryResult.size(resultSetSize); @@ -357,7 +361,7 @@ public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRe listener.onResponse(fetchResult); } }; - FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext, + FetchSearchPhase phase = new FetchSearchPhase(results, controller, null, mockSearchPhaseContext, (searchResponse, scrollId) -> new SearchPhase("test") { @Override public void run() { diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java index f49d3a69caca0..256340ef9b1d4 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java @@ -230,7 +230,7 @@ private static AtomicArray generateQueryResults(int nShards, String clusterAlias = randomBoolean() ? null : "remote"; SearchShardTarget searchShardTarget = new SearchShardTarget("", new ShardId("", "", shardIndex), clusterAlias, OriginalIndices.NONE); - QuerySearchResult querySearchResult = new QuerySearchResult(shardIndex, searchShardTarget); + QuerySearchResult querySearchResult = new QuerySearchResult(shardIndex, searchShardTarget, null); final TopDocs topDocs; float maxScore = 0; if (searchHitsSize == 0) { @@ -366,7 +366,7 @@ private void consumerTestCase(int numEmptyResponses) { } QuerySearchResult result = new QuerySearchResult(0, new SearchShardTarget("node", new ShardId("a", "b", 0), - null, OriginalIndices.NONE)); + null, OriginalIndices.NONE), null); result.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN), new DocValueFormat[0]); InternalAggregations aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", 1.0D, DocValueFormat.RAW, @@ -375,7 +375,7 @@ private void consumerTestCase(int numEmptyResponses) { result.setShardIndex(0); consumer.consumeResult(result); - result = new QuerySearchResult(1, new SearchShardTarget("node", new ShardId("a", "b", 0), null, OriginalIndices.NONE)); + result = new QuerySearchResult(1, new SearchShardTarget("node", new ShardId("a", "b", 0), null, OriginalIndices.NONE), null); result.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN), new DocValueFormat[0]); aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", 3.0D, DocValueFormat.RAW, @@ -384,7 +384,7 @@ private void consumerTestCase(int numEmptyResponses) { result.setShardIndex(2); consumer.consumeResult(result); - result = new QuerySearchResult(1, new SearchShardTarget("node", new ShardId("a", "b", 0), null, OriginalIndices.NONE)); + result = new QuerySearchResult(1, new SearchShardTarget("node", new ShardId("a", "b", 0), null, OriginalIndices.NONE), null); result.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN), new DocValueFormat[0]); aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", 2.0D, DocValueFormat.RAW, @@ -453,7 +453,7 @@ public void testConsumerConcurrently() throws InterruptedException { int number = randomIntBetween(1, 1000); max.updateAndGet(prev -> Math.max(prev, number)); QuerySearchResult result = new QuerySearchResult(id, new SearchShardTarget("node", new ShardId("a", "b", id), - null, OriginalIndices.NONE)); + null, OriginalIndices.NONE), null); result.topDocs(new TopDocsAndMaxScore( new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] {new ScoreDoc(0, number)}), number), new DocValueFormat[0]); @@ -497,7 +497,7 @@ public void testConsumerOnlyAggs() { int number = randomIntBetween(1, 1000); max.updateAndGet(prev -> Math.max(prev, number)); QuerySearchResult result = new QuerySearchResult(i, new SearchShardTarget("node", new ShardId("a", "b", i), - null, OriginalIndices.NONE)); + null, OriginalIndices.NONE), null); result.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), number), new DocValueFormat[0]); InternalAggregations aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", (double) number, @@ -535,7 +535,7 @@ public void testConsumerOnlyHits() { int number = randomIntBetween(1, 1000); max.updateAndGet(prev -> Math.max(prev, number)); QuerySearchResult result = new QuerySearchResult(i, new SearchShardTarget("node", new ShardId("a", "b", i), - null, OriginalIndices.NONE)); + null, OriginalIndices.NONE), null); result.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] {new ScoreDoc(0, number)}), number), new DocValueFormat[0]); result.setShardIndex(i); @@ -602,7 +602,7 @@ public void testReduceTopNWithFromOffset() { int score = 100; for (int i = 0; i < 4; i++) { QuerySearchResult result = new QuerySearchResult(i, new SearchShardTarget("node", new ShardId("a", "b", i), - null, OriginalIndices.NONE)); + null, OriginalIndices.NONE), null); ScoreDoc[] docs = new ScoreDoc[3]; for (int j = 0; j < docs.length; j++) { docs[j] = new ScoreDoc(0, score--); @@ -644,7 +644,7 @@ public void testConsumerSortByField() { FieldDoc[] fieldDocs = {new FieldDoc(0, Float.NaN, new Object[]{number})}; TopDocs topDocs = new TopFieldDocs(new TotalHits(1, Relation.EQUAL_TO), fieldDocs, sortFields); QuerySearchResult result = new QuerySearchResult(i, new SearchShardTarget("node", new ShardId("a", "b", i), - null, OriginalIndices.NONE)); + null, OriginalIndices.NONE), null); result.topDocs(new TopDocsAndMaxScore(topDocs, Float.NaN), docValueFormats); result.setShardIndex(i); result.size(size); @@ -682,7 +682,7 @@ public void testConsumerFieldCollapsing() { FieldDoc[] fieldDocs = {new FieldDoc(0, Float.NaN, values)}; TopDocs topDocs = new CollapseTopFieldDocs("field", new TotalHits(1, Relation.EQUAL_TO), fieldDocs, sortFields, values); QuerySearchResult result = new QuerySearchResult(i, new SearchShardTarget("node", new ShardId("a", "b", i), - null, OriginalIndices.NONE)); + null, OriginalIndices.NONE), null); result.topDocs(new TopDocsAndMaxScore(topDocs, Float.NaN), docValueFormats); result.setShardIndex(i); result.size(size); @@ -715,7 +715,7 @@ public void testConsumerSuggestions() { int maxScoreCompletion = -1; for (int i = 0; i < expectedNumResults; i++) { QuerySearchResult result = new QuerySearchResult(i, new SearchShardTarget("node", new ShardId("a", "b", i), - null, OriginalIndices.NONE)); + null, OriginalIndices.NONE), null); List>> suggestions = new ArrayList<>(); { @@ -841,7 +841,7 @@ public void onReduce(List shards, TotalHits totalHits, InternalAggr int number = randomIntBetween(1, 1000); max.updateAndGet(prev -> Math.max(prev, number)); QuerySearchResult result = new QuerySearchResult(id, new SearchShardTarget("node", new ShardId("a", "b", id), - null, OriginalIndices.NONE)); + null, OriginalIndices.NONE), null); result.topDocs(new TopDocsAndMaxScore( new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[]{new ScoreDoc(0, number)}), number), new DocValueFormat[0]); diff --git a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java index 198daf0f41a11..c6e41d805657a 100644 --- a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java +++ b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java @@ -50,6 +50,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.search.internal.AliasFilter; +import org.elasticsearch.search.internal.LegacyReaderContext; import org.elasticsearch.search.internal.ReaderContext; import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.search.rescore.RescoreContext; @@ -119,7 +120,7 @@ public void testPreProcess() throws Exception { SearchShardTarget target = new SearchShardTarget("node", shardId, null, OriginalIndices.NONE); - ReaderContext readerWithoutScroll = new ReaderContext(randomNonNegativeLong(), indexShard, engineSearcher, shardSearchRequest); + ReaderContext readerWithoutScroll = new ReaderContext(randomNonNegativeLong(), indexShard, engineSearcher); DefaultSearchContext contextWithoutScroll = new DefaultSearchContext(readerWithoutScroll, shardSearchRequest, target, null, indexService, indexShard, bigArrays, null, timeout, null); contextWithoutScroll.from(300); @@ -134,7 +135,7 @@ public void testPreProcess() throws Exception { // resultWindow greater than maxResultWindow and scrollContext isn't null when(shardSearchRequest.scroll()).thenReturn(new Scroll(TimeValue.timeValueMillis(randomInt(1000)))); - ReaderContext readerContext = new ReaderContext(randomNonNegativeLong(), indexShard, engineSearcher, shardSearchRequest); + ReaderContext readerContext = new LegacyReaderContext(randomNonNegativeLong(), indexShard, engineSearcher, shardSearchRequest); DefaultSearchContext context1 = new DefaultSearchContext(readerContext, shardSearchRequest, target, null, indexService, indexShard, bigArrays, null, timeout, null); context1.from(300); @@ -151,7 +152,7 @@ public void testPreProcess() throws Exception { RescoreContext rescoreContext = mock(RescoreContext.class); when(rescoreContext.getWindowSize()).thenReturn(500); - readerContext.addRescore(rescoreContext); + context1.addRescore(rescoreContext); exception = expectThrows(IllegalArgumentException.class, () -> context1.preProcess(false)); assertThat(exception.getMessage(), equalTo("Cannot use [sort] option in conjunction with [rescore].")); @@ -166,7 +167,7 @@ public void testPreProcess() throws Exception { + "] index level setting.")); readerContext.close(); - readerContext = new ReaderContext(randomNonNegativeLong(), indexShard, engineSearcher, shardSearchRequest); + readerContext = new ReaderContext(randomNonNegativeLong(), indexShard, engineSearcher); // rescore is null but sliceBuilder is not null DefaultSearchContext context2 = new DefaultSearchContext(readerContext, shardSearchRequest, target, null, indexService, indexShard, bigArrays, null, timeout, null); @@ -196,7 +197,7 @@ public void testPreProcess() throws Exception { when(shardSearchRequest.indexRoutings()).thenReturn(new String[0]); readerContext.close(); - readerContext = new ReaderContext(randomNonNegativeLong(), indexShard, engineSearcher, shardSearchRequest); + readerContext = new ReaderContext(randomNonNegativeLong(), indexShard, engineSearcher); DefaultSearchContext context4 = new DefaultSearchContext(readerContext, shardSearchRequest, target, null, indexService, indexShard, bigArrays, null, timeout, null); context4.sliceBuilder(new SliceBuilder(1,2)).parsedQuery(parsedQuery).preProcess(false); diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index bce588e69adb0..bed5f018a579c 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -317,6 +317,7 @@ public void onFailure(Exception e) { new ShardSearchRequest(OriginalIndices.NONE, useScroll ? scrollSearchRequest : searchRequest, indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null), + true, new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()), result); SearchPhaseResult searchPhaseResult = result.get(); IntArrayList intCursors = new IntArrayList(1); @@ -369,7 +370,7 @@ public void testTimeout() throws IOException { new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null); - try (ReaderContext reader = createReaderContext(indexShard, requestWithDefaultTimeout); + try (ReaderContext reader = createReaderContext(indexShard); SearchContext contextWithDefaultTimeout = service.createContext(reader, requestWithDefaultTimeout, null, randomBoolean())) { // the search context should inherit the default timeout assertThat(contextWithDefaultTimeout.timeout(), equalTo(TimeValue.timeValueSeconds(5))); @@ -384,7 +385,7 @@ public void testTimeout() throws IOException { 1, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null); - try (ReaderContext reader = createReaderContext(indexShard, requestWithCustomTimeout); + try (ReaderContext reader = createReaderContext(indexShard); SearchContext context = service.createContext(reader, requestWithCustomTimeout, null, randomBoolean())) { // the search context should inherit the query timeout assertThat(context.timeout(), equalTo(TimeValue.timeValueSeconds(seconds))); @@ -410,12 +411,12 @@ public void testMaxDocvalueFieldsSearch() throws IOException { } final ShardSearchRequest request = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null); - try (ReaderContext reader = createReaderContext(indexShard, request); + try (ReaderContext reader = createReaderContext(indexShard); SearchContext context = service.createContext(reader, request, null, randomBoolean())) { assertNotNull(context); } searchSourceBuilder.docValueField("one_field_too_much"); - try (ReaderContext reader = createReaderContext(indexShard, request)) { + try (ReaderContext reader = createReaderContext(indexShard)) { IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> service.createContext(reader, request, null, randomBoolean())); assertEquals( @@ -447,7 +448,7 @@ public void testMaxScriptFieldsSearch() throws IOException { final ShardSearchRequest request = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null); - try(ReaderContext reader = createReaderContext(indexShard, request)) { + try(ReaderContext reader = createReaderContext(indexShard)) { try (SearchContext context = service.createContext(reader, request, null, randomBoolean())) { assertNotNull(context); } @@ -479,7 +480,7 @@ public void testIgnoreScriptfieldIfSizeZero() throws IOException { final ShardSearchRequest request = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null); - try (ReaderContext reader = createReaderContext(indexShard, request); + try (ReaderContext reader = createReaderContext(indexShard); SearchContext context = service.createContext(reader, request, null, randomBoolean())) { assertEquals(0, context.scriptFields().fields().size()); } @@ -517,7 +518,7 @@ public void testMaxOpenScrollContexts() throws Exception { SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite(new ShardScrollRequestTest(indexShard.shardId()), indexShard); ElasticsearchException ex = expectThrows(ElasticsearchException.class, - () -> service.createAndPutReaderContext(rewriteContext)); + () -> service.createAndPutReaderContext(rewriteContext, randomBoolean())); assertEquals( "Trying to create too many scroll contexts. Must be less than or equal to: [" + SearchService.MAX_OPEN_SCROLL_CONTEXT.get(Settings.EMPTY) + "]. " + @@ -633,7 +634,7 @@ public void testCanMatch() throws Exception { CountDownLatch latch = new CountDownLatch(1); SearchShardTask task = new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()); - service.executeQueryPhase(request, task, new ActionListener() { + service.executeQueryPhase(request, randomBoolean(), task, new ActionListener() { @Override public void onResponse(SearchPhaseResult searchPhaseResult) { try { @@ -775,7 +776,7 @@ public SearchType searchType() { throw new NullPointerException("expected"); } }; - try (ReaderContext reader = createReaderContext(indexService.getShard(shardId.id()), request)) { + try (ReaderContext reader = createReaderContext(indexService.getShard(shardId.id()))) { NullPointerException e = expectThrows(NullPointerException.class, () -> service.createContext(reader, request, null, randomBoolean())); assertEquals("expected", e.getMessage()); @@ -801,7 +802,7 @@ public void testMatchNoDocsEmptyResponse() throws InterruptedException { { CountDownLatch latch = new CountDownLatch(1); shardRequest.source().query(new MatchAllQueryBuilder()); - service.executeQueryPhase(shardRequest, task, new ActionListener<>() { + service.executeQueryPhase(shardRequest, randomBoolean(), task, new ActionListener<>() { @Override public void onResponse(SearchPhaseResult result) { try { @@ -831,7 +832,7 @@ public void onFailure(Exception exc) { { CountDownLatch latch = new CountDownLatch(1); shardRequest.source().query(new MatchNoneQueryBuilder()); - service.executeQueryPhase(shardRequest, task, new ActionListener<>() { + service.executeQueryPhase(shardRequest, randomBoolean(), task, new ActionListener<>() { @Override public void onResponse(SearchPhaseResult result) { try { @@ -861,7 +862,7 @@ public void onFailure(Exception exc) { { CountDownLatch latch = new CountDownLatch(1); shardRequest.canReturnNullResponseIfMatchNoDocs(true); - service.executeQueryPhase(shardRequest, task, new ActionListener<>() { + service.executeQueryPhase(shardRequest, randomBoolean(), task, new ActionListener<>() { @Override public void onResponse(SearchPhaseResult result) { try { @@ -918,8 +919,8 @@ public void testDeleteIndexWhileSearch() throws Exception { } } - private ReaderContext createReaderContext(IndexShard shard, ShardSearchRequest request) { + private ReaderContext createReaderContext(IndexShard shard) { Engine.Searcher searcher = shard.acquireSearcher("test"); - return new ReaderContext(randomNonNegativeLong(), shard, searcher, request); + return new ReaderContext(randomNonNegativeLong(), shard, searcher); } } diff --git a/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java b/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java index c67850436b393..2b25567e1fc73 100644 --- a/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java +++ b/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java @@ -24,6 +24,8 @@ import org.apache.lucene.search.TotalHits; import org.elasticsearch.Version; import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; import org.elasticsearch.common.settings.Settings; @@ -35,6 +37,8 @@ import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalAggregationsTests; import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator; +import org.elasticsearch.search.internal.AliasFilter; +import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.search.suggest.SuggestTests; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; @@ -54,7 +58,11 @@ public QuerySearchResultTests() { private static QuerySearchResult createTestInstance() throws Exception { ShardId shardId = new ShardId("index", "uuid", randomInt()); - QuerySearchResult result = new QuerySearchResult(randomLong(), new SearchShardTarget("node", shardId, null, OriginalIndices.NONE)); + SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(randomBoolean()); + ShardSearchRequest shardSearchRequest = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, shardId, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, randomNonNegativeLong(), null, null); + QuerySearchResult result = new QuerySearchResult( + randomLong(), new SearchShardTarget("node", shardId, null, OriginalIndices.NONE), shardSearchRequest); if (randomBoolean()) { result.terminatedEarly(randomBoolean()); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java b/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java index 5ae5d498d7975..e1235bc92a5e4 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java +++ b/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java @@ -599,4 +599,9 @@ public SearchShardTask getTask() { public boolean isCancelled() { return task.isCancelled(); } + + @Override + public void addRescore(RescoreContext rescore) { + + } }