Skip to content

Commit

Permalink
improve bucket size aggregation limit
Browse files Browse the repository at this point in the history
postpone bucket size aggregation limit for terms aggregation
improve min_doc_count special use case 0, it loaded more texts from the
dict than segment_size limit

Note that we effectively check now that segment_size does not exceed the
bucket limit. segment_size is 10 * size. So requests would fail with
size 6500 for a bucket limit of 65000.

closes #1822
  • Loading branch information
PSeitz committed Jan 23, 2023
1 parent 2874554 commit 1b09dda
Showing 1 changed file with 79 additions and 19 deletions.
98 changes: 79 additions & 19 deletions src/aggregation/bucket/term_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::aggregation::agg_req_with_accessor::{
use crate::aggregation::intermediate_agg_result::{
IntermediateBucketResult, IntermediateTermBucketEntry, IntermediateTermBucketResult,
};
use crate::aggregation::segment_agg_result::{BucketCount, SegmentAggregationResultsCollector};
use crate::aggregation::segment_agg_result::SegmentAggregationResultsCollector;
use crate::error::DataCorruption;
use crate::fastfield::MultiValuedFastFieldReader;
use crate::schema::Type;
Expand Down Expand Up @@ -268,21 +268,18 @@ impl TermBuckets {
term_ids: &[u64],
doc: DocId,
sub_aggregation: &AggregationsWithAccessor,
bucket_count: &BucketCount,
blueprint: &Option<SegmentAggregationResultsCollector>,
) -> crate::Result<()> {
for &term_id in term_ids {
let entry = self.entries.entry(term_id as u32).or_insert_with(|| {
bucket_count.add_count(1);

TermBucketEntry::from_blueprint(blueprint)
});
let entry = self
.entries
.entry(term_id as u32)
.or_insert_with(|| TermBucketEntry::from_blueprint(blueprint));
entry.doc_count += 1;
if let Some(sub_aggregations) = entry.sub_aggregations.as_mut() {
sub_aggregations.collect(doc, sub_aggregation)?;
}
}
bucket_count.validate_bucket_count()?;

Ok(())
}
Expand Down Expand Up @@ -372,7 +369,7 @@ impl SegmentTermCollector {
}
OrderTarget::SubAggregation(_name) => {
// don't sort and cut off since it's hard to make assumptions on the quality of the
// results when cutting off du to unknown nature of the sub_aggregation (possible
// results when cutting off due to unknown nature of the sub_aggregation (possible
// to check).
}
OrderTarget::Count => {
Expand Down Expand Up @@ -412,6 +409,10 @@ impl SegmentTermCollector {
if self.req.min_doc_count == 0 {
let mut stream = term_dict.stream()?;
while let Some((key, _ord)) = stream.next() {
if dict.len() >= self.req.segment_size as usize {
break;
}

let key = std::str::from_utf8(key)
.map_err(|utf8_err| DataCorruption::comment_only(utf8_err.to_string()))?;
if !dict.contains_key(key) {
Expand All @@ -433,6 +434,8 @@ impl SegmentTermCollector {
sum_other_doc_count += sum_other_docs;
dict = dict_entries.into_iter().collect();
}
agg_with_accessor.bucket_count.add_count(dict.len() as u32);
agg_with_accessor.bucket_count.validate_bucket_count()?;

Ok(IntermediateBucketResult::Terms(
IntermediateTermBucketResult {
Expand Down Expand Up @@ -469,28 +472,24 @@ impl SegmentTermCollector {
&vals1,
docs[0],
&bucket_with_accessor.sub_aggregation,
&bucket_with_accessor.bucket_count,
&self.blueprint,
)?;
self.term_buckets.increment_bucket(
&vals2,
docs[1],
&bucket_with_accessor.sub_aggregation,
&bucket_with_accessor.bucket_count,
&self.blueprint,
)?;
self.term_buckets.increment_bucket(
&vals3,
docs[2],
&bucket_with_accessor.sub_aggregation,
&bucket_with_accessor.bucket_count,
&self.blueprint,
)?;
self.term_buckets.increment_bucket(
&vals4,
docs[3],
&bucket_with_accessor.sub_aggregation,
&bucket_with_accessor.bucket_count,
&self.blueprint,
)?;
}
Expand All @@ -501,7 +500,6 @@ impl SegmentTermCollector {
&vals1,
doc,
&bucket_with_accessor.sub_aggregation,
&bucket_with_accessor.bucket_count,
&self.blueprint,
)?;
}
Expand Down Expand Up @@ -1136,6 +1134,33 @@ mod tests {
assert_eq!(res["my_texts"]["sum_other_doc_count"], 0);
assert_eq!(res["my_texts"]["doc_count_error_upper_bound"], 0);

let agg_req: Aggregations = vec![(
"my_texts".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(),
min_doc_count: Some(0),
size: Some(1),
..Default::default()
}),
sub_aggregation: Default::default(),
}),
)]
.into_iter()
.collect();

// searching for terma, but min_doc_count will return all terms
let res = exec_request_with_query(agg_req, &index, Some(("string_id", "terma")))?;

assert_eq!(res["my_texts"]["buckets"][0]["key"], "terma");
assert_eq!(res["my_texts"]["buckets"][0]["doc_count"], 4);
assert_eq!(
res["my_texts"]["buckets"][1]["key"],
serde_json::Value::Null
);
assert_eq!(res["my_texts"]["sum_other_doc_count"], 0);
assert_eq!(res["my_texts"]["doc_count_error_upper_bound"], 0);

Ok(())
}

Expand Down Expand Up @@ -1214,6 +1239,27 @@ mod tests {

let index = get_test_index_from_terms(true, &terms_per_segment)?;

let agg_req: Aggregations = vec![(
"my_texts".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(),
// min_doc_count: Some(0),
..Default::default()
}),
sub_aggregation: Default::default(),
}),
)]
.into_iter()
.collect();

let res = exec_request_with_query(agg_req, &index, None);

assert!(res.is_ok());

// This request has min_doc_count set to 0
// That means we load potentially the whole dict
// Make sure the bucket count is still fine
let agg_req: Aggregations = vec![(
"my_texts".to_string(),
Aggregation::Bucket(BucketAggregation {
Expand All @@ -1228,6 +1274,24 @@ mod tests {
.into_iter()
.collect();

let res = exec_request_with_query(agg_req, &index, None);
assert!(res.is_ok());

let agg_req: Aggregations = vec![(
"my_texts".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(),
// min_doc_count: Some(0),
size: Some(70_000),
..Default::default()
}),
sub_aggregation: Default::default(),
}),
)]
.into_iter()
.collect();

let res = exec_request_with_query(agg_req, &index, None);

assert!(res.is_err());
Expand Down Expand Up @@ -1384,14 +1448,10 @@ mod bench {
let mut collector = get_collector_with_buckets(total_terms);
let vals = get_rand_terms(total_terms, num_terms);
let aggregations_with_accessor: AggregationsWithAccessor = Default::default();
let bucket_count: BucketCount = BucketCount {
bucket_count: Default::default(),
max_bucket_count: 1_000_001u32,
};
b.iter(|| {
for &val in &vals {
collector
.increment_bucket(&[val], 0, &aggregations_with_accessor, &bucket_count, &None)
.increment_bucket(&[val], 0, &aggregations_with_accessor, &None)
.unwrap();
}
})
Expand Down

0 comments on commit 1b09dda

Please sign in to comment.