Skip to content

Commit

Permalink
Cut over from SearchContext to ReaderContext (#51282)
Browse files Browse the repository at this point in the history
With this change, we partially move the state of SearchContext to
ReaderContext. This is another step allowing us to move the state of
search to the coordinating node.

We will need several follow-ups to move the entire search state to the
coordinating node.

Relates #46523
  • Loading branch information
dnhatn committed Feb 22, 2020
1 parent 8b710d1 commit fb7386a
Show file tree
Hide file tree
Showing 30 changed files with 542 additions and 697 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,8 @@ public TopDocsAndMaxScore[] topDocs(SearchHit[] hits) throws IOException {
topDocsCollector = TopScoreDocCollector.create(topN, Integer.MAX_VALUE);
maxScoreCollector = new MaxScoreCollector();
}
try {
for (LeafReaderContext ctx : context.searcher().getIndexReader().leaves()) {
intersect(weight, innerHitQueryWeight, MultiCollector.wrap(topDocsCollector, maxScoreCollector), ctx);
}
} finally {
clearReleasables(Lifetime.COLLECTION);
for (LeafReaderContext ctx : context.searcher().getIndexReader().leaves()) {
intersect(weight, innerHitQueryWeight, MultiCollector.wrap(topDocsCollector, maxScoreCollector), ctx);
}
TopDocs topDocs = topDocsCollector.topDocs(from(), size());
float maxScore = Float.NaN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,13 +285,13 @@ public void writeTo(StreamOutput out) throws IOException {
public static void registerRequestHandler(TransportService transportService, SearchService searchService) {
transportService.registerRequestHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, ScrollFreeContextRequest::new,
(request, channel, task) -> {
boolean freed = searchService.freeContext(request.id());
boolean freed = searchService.freeReaderContext(request.id());
channel.sendResponse(new SearchFreeContextResponse(freed));
});
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_SCROLL_ACTION_NAME, SearchFreeContextResponse::new);
transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, ThreadPool.Names.SAME, SearchFreeContextRequest::new,
(request, channel, task) -> {
boolean freed = searchService.freeContext(request.id());
boolean freed = searchService.freeReaderContext(request.id());
channel.sendResponse(new SearchFreeContextResponse(freed));
});
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME, SearchFreeContextResponse::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,11 +428,7 @@ public TopDocsAndMaxScore[] topDocs(SearchHit[] hits) throws IOException {
topDocsCollector = TopScoreDocCollector.create(topN, Integer.MAX_VALUE);
maxScoreCollector = new MaxScoreCollector();
}
try {
intersect(weight, innerHitQueryWeight, MultiCollector.wrap(topDocsCollector, maxScoreCollector), ctx);
} finally {
clearReleasables(Lifetime.COLLECTION);
}
intersect(weight, innerHitQueryWeight, MultiCollector.wrap(topDocsCollector, maxScoreCollector), ctx);

TopDocs td = topDocsCollector.topDocs(from(), size());
float maxScore = Float.NaN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.search.internal.ReaderContext;
import org.elasticsearch.search.internal.ScrollContext;
import org.elasticsearch.search.internal.SearchContext;

import java.util.HashMap;
Expand Down Expand Up @@ -146,25 +148,25 @@ private StatsHolder groupStats(String group) {
}

@Override
public void onNewContext(SearchContext context) {
public void onNewReaderContext(ReaderContext readerContext) {
openContexts.inc();
}

@Override
public void onFreeContext(SearchContext context) {
public void onFreeReaderContext(ReaderContext readerContext) {
openContexts.dec();
}

@Override
public void onNewScrollContext(SearchContext context) {
public void onNewScrollContext(ScrollContext scrollContext) {
totalStats.scrollCurrent.inc();
}

@Override
public void onFreeScrollContext(SearchContext context) {
public void onFreeScrollContext(ScrollContext scrollContext) {
totalStats.scrollCurrent.dec();
assert totalStats.scrollCurrent.count() >= 0;
totalStats.scrollMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - context.getOriginNanoTime()));
totalStats.scrollMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - scrollContext.getStartTimeInNano()));
}

static final class StatsHolder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.search.internal.ReaderContext;
import org.elasticsearch.search.internal.ScrollContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.transport.TransportRequest;

Expand Down Expand Up @@ -76,34 +78,34 @@ default void onFailedFetchPhase(SearchContext searchContext) {}
default void onFetchPhase(SearchContext searchContext, long tookInNanos) {}

/**
* Executed when a new search context was created
* @param context the created context
* Executed when a new reader context was created
* @param readerContext the created context
*/
default void onNewContext(SearchContext context) {}
default void onNewReaderContext(ReaderContext readerContext) {}

/**
* Executed when a previously created search context is freed.
* Executed when a previously created reader context is freed.
* This happens either when the search execution finishes, if the
* execution failed or if the search context as idle for and needs to be
* cleaned up.
* @param context the freed search context
* @param readerContext the freed reader context
*/
default void onFreeContext(SearchContext context) {}
default void onFreeReaderContext(ReaderContext readerContext) {}

/**
* Executed when a new scroll search {@link SearchContext} was created
* @param context the created search context
* @param scrollContext the created search context
*/
default void onNewScrollContext(SearchContext context) {}
default void onNewScrollContext(ScrollContext scrollContext) {}

/**
* Executed when a scroll search {@link SearchContext} is freed.
* This happens either when the scroll search execution finishes, if the
* execution failed or if the search context as idle for and needs to be
* cleaned up.
* @param context the freed search context
* @param scrollContext the freed search context
*/
default void onFreeScrollContext(SearchContext context) {}
default void onFreeScrollContext(ScrollContext scrollContext) {}

/**
* Executed prior to using a {@link SearchContext} that has been retrieved
Expand All @@ -114,6 +116,14 @@ default void onFreeScrollContext(SearchContext context) {}
*/
default void validateSearchContext(SearchContext context, TransportRequest transportRequest) {}

/**
* Executed when a search context was freed. The implementor can implement
* this method to release resources used by the search context.
*/
default void onFreeSearchContext(SearchContext context) {

}

/**
* A Composite listener that multiplexes calls to each of the listeners methods.
*/
Expand Down Expand Up @@ -193,43 +203,43 @@ public void onFetchPhase(SearchContext searchContext, long tookInNanos) {
}

@Override
public void onNewContext(SearchContext context) {
public void onNewReaderContext(ReaderContext readerContext) {
for (SearchOperationListener listener : listeners) {
try {
listener.onNewContext(context);
listener.onNewReaderContext(readerContext);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onNewContext listener [{}] failed", listener), e);
}
}
}

@Override
public void onFreeContext(SearchContext context) {
public void onFreeReaderContext(ReaderContext readerContext) {
for (SearchOperationListener listener : listeners) {
try {
listener.onFreeContext(context);
listener.onFreeReaderContext(readerContext);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onFreeContext listener [{}] failed", listener), e);
}
}
}

@Override
public void onNewScrollContext(SearchContext context) {
public void onNewScrollContext(ScrollContext scrollContext) {
for (SearchOperationListener listener : listeners) {
try {
listener.onNewScrollContext(context);
listener.onNewScrollContext(scrollContext);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onNewScrollContext listener [{}] failed", listener), e);
}
}
}

@Override
public void onFreeScrollContext(SearchContext context) {
public void onFreeScrollContext(ScrollContext scrollContext) {
for (SearchOperationListener listener : listeners) {
try {
listener.onFreeScrollContext(context);
listener.onFreeScrollContext(scrollContext);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onFreeScrollContext listener [{}] failed", listener), e);
}
Expand All @@ -248,5 +258,16 @@ public void validateSearchContext(SearchContext context, TransportRequest reques
}
ExceptionsHelper.reThrowIfNotNull(exception);
}

@Override
public void onFreeSearchContext(SearchContext context) {
for (SearchOperationListener listener : listeners) {
try {
listener.onFreeSearchContext(context);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onFreeSearchContext listener [{}] failed", listener), e);
}
}
}
}
}
Loading

0 comments on commit fb7386a

Please sign in to comment.