-
Notifications
You must be signed in to change notification settings - Fork 25.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a shard filter search phase to pre-filter shards based on query rewriting #25658
Conversation
…ewriting Today if we search across a large amount of shards we hit every shard. Yet, it's quite common to search across an index pattern for time based indices but filtering will exclude all results outside a certain time range ie. `now-3d`. While the search can potentially hit hunderets of shards the majority of the shards might yield 0 results since there is not document that is within this date range. Kibana for instance does this regularly but used `_field_stats` to optimzie the indice they need to query. Now with the deprecation of `_field_stats` and it's upcoming removal a single dashboard in kibanan can potentially turn into searches hitting hunderets or thousands of shards and that can easily cause search rejections even though the most of the requests are very likely super cheap and only need a query rewriting to early terminate with 0 results. This change adds a pre-filter phase for searches that can, if the number of shards are higher than a the `pre_filter_shards_after` threshold (defaults to 128 shards), fan out to the shards and check if the query can potentially match any documents at all. While false positives are possible, a negative response means that no matches are possible. These requests are not subject to rejection and can greatly reduce the number of shards a request needs to hit. The approach here is preferable to the kibana approach with field stats since it correctly handles aliases and uses the correct threadpools to execute these requests. Further it's completely transparent to the user and improves scalability of elasticsearch in general on large clusters.
@jpountz @jimczi I will need to do some work on the unittest end but I wanted to get it out here asap for first rounds and opinions. I also would like to have @clintongormley to look into naming of the parameter I am not a huge fan of it. |
/cc @spalger |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some minor comments but I love it.
This is a nice solution for time based search but not only so a huge +1.
} else if (results.numMatches == 0) { | ||
// this is a special case where we have no hit but we need to get at least one search response in order | ||
// to produce a valid search result with all the aggs etc. at least that is what I think is the case... and clint does so | ||
// too :D |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree too ;)
It's extra work since for instance global ords or fielddata could be loaded by this single search but we can optimize this later. It's already a huge win since this will avoid the loading on the other shards !
@@ -58,6 +58,8 @@ | |||
|
|||
private static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false")); | |||
|
|||
public static final int DEFAULT_PRE_FILTER_SHARDS_AFTER = 1; | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the default ? 128 like below or 1 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
haha yeah true I wanted to trigger this constantly so I changed this but didn't revert
if (source != null) { | ||
QueryBuilder queryBuilder = source.query(); | ||
AggregatorFactories.Builder aggregations = source.aggregations(); | ||
boolean hasGlobalAggs = aggregations != null && aggregations.hasGlobalAggregationBuilder(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be check on the coordinating node instead to save the round trip since if there is a global agg all shards must match ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++ will do that
pre_filter_shards_after: 1 | ||
body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2016-02-01", "lt": "2018-02-01"} } } } | ||
|
||
- match: { _shards.total: 2 } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand why it's important for testing this feature but shouldn't we return the total number of shards pre-filtering ? I think it should be transparent and not modify the total here, otherwise it becomes hard to understand why some shards are in and some are not ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed I would like it better if it was transparent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can try but it might complicate things to be honest...
/cc @n0othing @astefan @gingerwizard @inqueue as we talked about this a few days ago. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some thoughts.
@@ -58,6 +58,8 @@ | |||
|
|||
private static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false")); | |||
|
|||
public static final int DEFAULT_PRE_FILTER_SHARDS_AFTER = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
docs claim the default is 128?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
haha yeah true I wanted to trigger this constantly so I changed this but didn't revert
if (source != null) { | ||
QueryBuilder queryBuilder = source.query(); | ||
AggregatorFactories.Builder aggregations = source.aggregations(); | ||
boolean hasGlobalAggs = aggregations != null && aggregations.hasGlobalAggregationBuilder(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uh oh oh I would have forgotten about this guy. I guess testing found it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe there is a similar case with minDocCount=0 on terms aggs which exposes all terms contained in the terms dict of doc values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe there is a similar case with minDocCount=0 on terms aggs which exposes all terms contained in the terms dict of doc values.
can you elaborate on this. I am not sure how i can check that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. This means that we need to check all root aggregations and make sure that none of them can return buckets when the query is MatchNone.
I think we could/should make the aggregation rewriting aware of the query rewriting.
Currently we rewrite aggregations on the shards but they are not supposed to check the query. Instead we could just pass the rewritten query when we rewrite aggs and if the query cannot match document the agg could be rewritten in an MatchNoneAggregationBuilder. Then we could have special cases for aggs like a root terms aggregation with minDocCount set to 0 and canMatch
could check after the aggs rewriting that all root aggregations are MatchNoneAggregationBuilder ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a first step, I'd just do instanceof
checks for TermsAggregationBuilder
and (Date)HistogramAggregationBuilder
, and check the value of minDocCount
.
@@ -105,7 +105,8 @@ private void innerRun() throws IOException { | |||
-> moveToNextPhase(searchPhaseController, scrollId, reducedQueryPhase, queryAndFetchOptimization ? | |||
queryResults : fetchResults); | |||
if (queryAndFetchOptimization) { | |||
assert phaseResults.isEmpty() || phaseResults.get(0).fetchResult() != null; | |||
assert phaseResults.isEmpty() || phaseResults.get(0).fetchResult() != null : "phaseResults emtpy [" + phaseResults.isEmpty() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: emtpy
--> empty
List<SearchRequest> requests = multiRequest.requests(); | ||
preFilterShardsAfter = Math.max(1, preFilterShardsAfter / (requests.size()+1)); | ||
for (SearchRequest request : requests) { | ||
request.setPreFilterSearchShardsAfter(preFilterShardsAfter); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we check if preFilterShardsAfter
has been set explicitly on the search request and set it to the min of preFilterShardsAfter
and request.getPreFilterSearchShardsAfter()
? Not sure if this would actually matter in practice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch I will do that
AggregatorFactories.Builder aggregations = source.aggregations(); | ||
boolean hasGlobalAggs = aggregations != null && aggregations.hasGlobalAggregationBuilder(); | ||
if (queryBuilder != null && hasGlobalAggs == false) { // we need to executed hasGlobalAggs is equivalent to match all | ||
return queryBuilder instanceof MatchNoneQueryBuilder == false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I can see the only time this will be hit is if the query is a simple range query which does not overlap with the data on the shard as we only check the root query type. This means that if you have a boolean query with a must/filter range clause and other clauses this won't be rewritten to a match none query and therefore will still cause the search request to hit that shard. To me this seems like a fairly common case for search. Maybe we should change the rewrite of the BoolQueryBuilder to rewrite to a match none query if any of the must/filter clauses are match_none to catch these cases too? (I can add this in a separate PR after this is merged)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is irrelevent as I hadn't seen #25650
I don't have much in the way of suggestions, but we have |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for keeping the "total" _shards.total
.
For the min_doc_count
issue I agree with Adrien, just checking the root aggregations builders should be enough.
* master: (181 commits) Use a non default port range in MockTransportService Add a shard filter search phase to pre-filter shards based on query rewriting (elastic#25658) Prevent excessive disk consumption by log files Migrate RestHttpResponseHeadersIT to ESRestTestCase (elastic#25675) Use config directory to find jvm.options Fix inadvertent rename of systemd tests Adding basic search request documentation for high level client (elastic#25651) Disallow lang to be used with Stored Scripts (elastic#25610) Fix typo in ScriptDocValues deprecation warnings (elastic#25672) Changes DocValueFieldsFetchSubPhase to reuse doc values iterators for multiple hits (elastic#25644) Query range fields by doc values when they are expected to be more efficient than points. Remove SearchHit#internalHits (elastic#25653) [DOCS] Reorganized the highlighting topic so it's less confusing. Add an underscore to flood stage setting Avoid failing install if system-sysctl is masked Add another parent value option to join documentation (elastic#25609) Ensure we rewrite common queries to `match_none` if possible (elastic#25650) Remove reference to field-stats docs. Optimize the order of bytes in uuids for better compression. (elastic#24615) Fix BytesReferenceStreamInput#skip with offset (elastic#25634) ...
…tion in mixed version 6.0 applies some optimization to query rewriting if the number of shards is large. In oder to make use of this optimization this commit adds the internal endpoint to 5.6 such that a 6.0 coordinator node can make use of the feature even in a mixed cluster or via cross cluster search. Relates to elastic#25658
…ewriting (#25658) Today if we search across a large amount of shards we hit every shard. Yet, it's quite common to search across an index pattern for time based indices but filtering will exclude all results outside a certain time range ie. `now-3d`. While the search can potentially hit hundreds of shards the majority of the shards might yield 0 results since there is not document that is within this date range. Kibana for instance does this regularly but used `_field_stats` to optimize the indexes they need to query. Now with the deprecation of `_field_stats` and it's upcoming removal a single dashboard in kibana can potentially turn into searches hitting hundreds or thousands of shards and that can easily cause search rejections even though the most of the requests are very likely super cheap and only need a query rewriting to early terminate with 0 results. This change adds a pre-filter phase for searches that can, if the number of shards are higher than a the `pre_filter_shard_size` threshold (defaults to 128 shards), fan out to the shards and check if the query can potentially match any documents at all. While false positives are possible, a negative response means that no matches are possible. These requests are not subject to rejection and can greatly reduce the number of shards a request needs to hit. The approach here is preferable to the kibana approach with field stats since it correctly handles aliases and uses the correct threadpools to execute these requests. Further it's completely transparent to the user and improves scalability of elasticsearch in general on large clusters.
Even if the query part can rewrite to match none we can't skip the suggest execution since it might yield results. Relates to elastic#25658
Even if the query part can rewrite to match none we can't skip the suggest execution since it might yield results. Relates to #25658
Even if the query part can rewrite to match none we can't skip the suggest execution since it might yield results. Relates to #25658
@@ -282,10 +286,22 @@ public void writeTo(StreamOutput out) throws IOException { | |||
} | |||
} | |||
|
|||
public Builder addAggregators(AggregatorFactories factories) { | |||
throw new UnsupportedOperationException("This needs to be removed"); | |||
public boolean mustVisiteAllDocs() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra 'e'
This is a great idea, but it looks like failIfOverShardCountLimit is called before the shards are skipped. Is there any reason it has to be like this? Obviously, if I query index-* with a small time range query the pre filter would bring the shard count to < 1000, but it will still fail failIfOverShardCountLimit. So, naively, it looks like it would be better to do failIfOverShardCountLimit after. |
Ok, I spoke too soon. Looks like the failIfOverShardCountLimit defaults to disabled in the latest branch. This all makes sense now. |
@s1monw Would you be able to expand on the origin of the default shard threshold of 128? Is there a reasonable rule of thumb in terms of the amount of overhead per shard incurred by pre-filtering? Thanks! |
Today if we search across a large amount of shards we hit every shard. Yet, it's quite
common to search across an index pattern for time based indices but filtering will exclude
all results outside a certain time range ie.
now-3d
. While the search can potentially hithundreds of shards the majority of the shards might yield 0 results since there is not document
that is within this date range. Kibana for instance does this regularly but used
_field_stats
to optimize the indexes they need to query. Now with the deprecation of
_field_stats
and it's upcomingremoval a single dashboard in kibana can potentially turn into searches hitting hundreds or thousands
of shards and that can easily cause search rejections even though the most of the requests are
very likely super cheap and only need a query rewriting to early terminate with 0 results.
This change adds a pre-filter phase for searches that can, if the number of shards are higher than
a the
pre_filter_shard_size
threshold (defaults to 128 shards), fan out to the shardsand check if the query can potentially match any documents at all. While false positives are possible,
a negative response means that no matches are possible. These requests are not subject to rejection
and can greatly reduce the number of shards a request needs to hit. The approach here is preferable
to the kibana approach with field stats since it correctly handles aliases and uses the correct
threadpools to execute these requests. Further it's completely transparent to the user and improves
scalability of elasticsearch in general on large clusters.