From c350a6d6e7c8dbfdd7f8e99cbaea74a45b493f56 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Tue, 16 May 2023 15:27:55 +0800 Subject: [PATCH 1/8] switch to ms in histogram for date type switch to ms in histogram, by adding a normalization step that converts to nanoseconds precision when creating the collector. closes #2028 related to #2026 --- columnar/src/columnar/column_type.rs | 3 + src/aggregation/agg_req_with_accessor.rs | 6 +- .../bucket/histogram/date_histogram.rs | 102 +++++++++--------- src/aggregation/bucket/histogram/histogram.rs | 88 +++++++++------ src/aggregation/date.rs | 13 +++ src/aggregation/intermediate_agg_result.rs | 13 +-- src/aggregation/segment_agg_result.rs | 4 +- 7 files changed, 131 insertions(+), 98 deletions(-) diff --git a/columnar/src/columnar/column_type.rs b/columnar/src/columnar/column_type.rs index 835efdd8b7..ac61a7253f 100644 --- a/columnar/src/columnar/column_type.rs +++ b/columnar/src/columnar/column_type.rs @@ -54,6 +54,9 @@ impl ColumnType { pub fn to_code(self) -> u8 { self as u8 } + pub fn is_date_time(&self) -> bool { + self == &ColumnType::DateTime + } pub(crate) fn try_from_code(code: u8) -> Result { COLUMN_TYPES.get(code as usize).copied().ok_or(InvalidData) diff --git a/src/aggregation/agg_req_with_accessor.rs b/src/aggregation/agg_req_with_accessor.rs index 329b590786..347d8e3d3c 100644 --- a/src/aggregation/agg_req_with_accessor.rs +++ b/src/aggregation/agg_req_with_accessor.rs @@ -32,12 +32,13 @@ impl AggregationsWithAccessor { pub struct AggregationWithAccessor { /// In general there can be buckets without fast field access, e.g. buckets that are created - /// based on search terms. So eventually this needs to be Option or moved. + /// based on search terms. That is not that case currently, but eventually this needs to be + /// Option or moved. pub(crate) accessor: Column, pub(crate) str_dict_column: Option, pub(crate) field_type: ColumnType, /// In case there are multiple types of fast fields, e.g. string and numeric. - /// Only used for term aggregations + /// Only used for term aggregations currently. pub(crate) accessor2: Option<(Column, ColumnType)>, pub(crate) sub_aggregation: AggregationsWithAccessor, pub(crate) limits: ResourceLimitGuard, @@ -105,6 +106,7 @@ impl AggregationWithAccessor { (accessor, field_type) } }; + let sub_aggregation = sub_aggregation.clone(); Ok(AggregationWithAccessor { accessor, diff --git a/src/aggregation/bucket/histogram/date_histogram.rs b/src/aggregation/bucket/histogram/date_histogram.rs index 6188973c24..d745236a03 100644 --- a/src/aggregation/bucket/histogram/date_histogram.rs +++ b/src/aggregation/bucket/histogram/date_histogram.rs @@ -77,7 +77,7 @@ pub struct DateHistogramAggregationReq { /// hard_bounds only limits the buckets, to force a range set both extended_bounds and /// hard_bounds to the same range. /// - /// Needs to be provided as timestamp in nanosecond precision. + /// Needs to be provided as timestamp in milliseconds precision. /// /// ## Example /// ```json @@ -88,7 +88,7 @@ pub struct DateHistogramAggregationReq { /// "interval": "1d", /// "hard_bounds": { /// "min": 0, - /// "max": 1420502400000000000 + /// "max": 1420502400000 /// } /// } /// } @@ -114,11 +114,11 @@ impl DateHistogramAggregationReq { self.validate()?; Ok(HistogramAggregation { field: self.field.to_string(), - interval: parse_into_nanoseconds(self.fixed_interval.as_ref().unwrap())? as f64, + interval: parse_into_milliseconds(self.fixed_interval.as_ref().unwrap())? as f64, offset: self .offset .as_ref() - .map(|offset| parse_offset_into_nanosecs(offset)) + .map(|offset| parse_offset_into_milliseconds(offset)) .transpose()? .map(|el| el as f64), min_doc_count: self.min_doc_count, @@ -153,7 +153,7 @@ impl DateHistogramAggregationReq { )); } - parse_into_nanoseconds(self.fixed_interval.as_ref().unwrap())?; + parse_into_milliseconds(self.fixed_interval.as_ref().unwrap())?; Ok(()) } @@ -179,7 +179,7 @@ pub enum DateHistogramParseError { OutOfBounds(String), } -fn parse_offset_into_nanosecs(input: &str) -> Result { +fn parse_offset_into_milliseconds(input: &str) -> Result { let is_sign = |byte| &[byte] == b"-" || &[byte] == b"+"; if input.is_empty() { return Err(DateHistogramParseError::InvalidOffset(input.to_string()).into()); @@ -188,18 +188,18 @@ fn parse_offset_into_nanosecs(input: &str) -> Result { let has_sign = is_sign(input.as_bytes()[0]); if has_sign { let (sign, input) = input.split_at(1); - let val = parse_into_nanoseconds(input)?; + let val = parse_into_milliseconds(input)?; if sign == "-" { Ok(-val) } else { Ok(val) } } else { - parse_into_nanoseconds(input) + parse_into_milliseconds(input) } } -fn parse_into_nanoseconds(input: &str) -> Result { +fn parse_into_milliseconds(input: &str) -> Result { let split_boundary = input .as_bytes() .iter() @@ -218,7 +218,7 @@ fn parse_into_nanoseconds(input: &str) -> Result { // here and being defensive does not hurt. .map_err(|_err| DateHistogramParseError::NumberMissing(input.to_string()))?; - let multiplier_from_unit = match unit { + let unit_in_ms = match unit { "ms" => 1, "s" => 1000, "m" => 60 * 1000, @@ -227,8 +227,8 @@ fn parse_into_nanoseconds(input: &str) -> Result { _ => return Err(DateHistogramParseError::UnitNotRecognized(unit.to_string()).into()), }; - let val = (number * multiplier_from_unit) - .checked_mul(1_000_000) + let val = number * unit_in_ms; + val.checked_mul(1_000_000) .ok_or_else(|| DateHistogramParseError::OutOfBounds(input.to_string()))?; Ok(val) @@ -246,49 +246,49 @@ mod tests { use crate::Index; #[test] - fn test_parse_into_nanosecs() { - assert_eq!(parse_into_nanoseconds("1m").unwrap(), 60_000_000_000); - assert_eq!(parse_into_nanoseconds("2m").unwrap(), 120_000_000_000); + fn test_parse_into_millisecs() { + assert_eq!(parse_into_milliseconds("1m").unwrap(), 60_000); + assert_eq!(parse_into_milliseconds("2m").unwrap(), 120_000); assert_eq!( - parse_into_nanoseconds("2y").unwrap_err(), + parse_into_milliseconds("2y").unwrap_err(), DateHistogramParseError::UnitNotRecognized("y".to_string()).into() ); assert_eq!( - parse_into_nanoseconds("2000").unwrap_err(), + parse_into_milliseconds("2000").unwrap_err(), DateHistogramParseError::UnitMissing("2000".to_string()).into() ); assert_eq!( - parse_into_nanoseconds("ms").unwrap_err(), + parse_into_milliseconds("ms").unwrap_err(), DateHistogramParseError::NumberMissing("ms".to_string()).into() ); } #[test] - fn test_parse_offset_into_nanosecs() { - assert_eq!(parse_offset_into_nanosecs("1m").unwrap(), 60_000_000_000); - assert_eq!(parse_offset_into_nanosecs("+1m").unwrap(), 60_000_000_000); - assert_eq!(parse_offset_into_nanosecs("-1m").unwrap(), -60_000_000_000); - assert_eq!(parse_offset_into_nanosecs("2m").unwrap(), 120_000_000_000); - assert_eq!(parse_offset_into_nanosecs("+2m").unwrap(), 120_000_000_000); - assert_eq!(parse_offset_into_nanosecs("-2m").unwrap(), -120_000_000_000); - assert_eq!(parse_offset_into_nanosecs("-2ms").unwrap(), -2_000_000); + fn test_parse_offset_into_milliseconds() { + assert_eq!(parse_offset_into_milliseconds("1m").unwrap(), 60_000); + assert_eq!(parse_offset_into_milliseconds("+1m").unwrap(), 60_000); + assert_eq!(parse_offset_into_milliseconds("-1m").unwrap(), -60_000); + assert_eq!(parse_offset_into_milliseconds("2m").unwrap(), 120_000); + assert_eq!(parse_offset_into_milliseconds("+2m").unwrap(), 120_000); + assert_eq!(parse_offset_into_milliseconds("-2m").unwrap(), -120_000); + assert_eq!(parse_offset_into_milliseconds("-2ms").unwrap(), -2); assert_eq!( - parse_offset_into_nanosecs("2y").unwrap_err(), + parse_offset_into_milliseconds("2y").unwrap_err(), DateHistogramParseError::UnitNotRecognized("y".to_string()).into() ); assert_eq!( - parse_offset_into_nanosecs("2000").unwrap_err(), + parse_offset_into_milliseconds("2000").unwrap_err(), DateHistogramParseError::UnitMissing("2000".to_string()).into() ); assert_eq!( - parse_offset_into_nanosecs("ms").unwrap_err(), + parse_offset_into_milliseconds("ms").unwrap_err(), DateHistogramParseError::NumberMissing("ms".to_string()).into() ); } #[test] fn test_parse_into_milliseconds_do_not_accept_non_ascii() { - assert!(parse_into_nanoseconds("1m").is_err()); + assert!(parse_into_milliseconds("1m").is_err()); } pub fn get_test_index_from_docs( @@ -369,7 +369,7 @@ mod tests { "buckets" : [ { "key_as_string" : "2015-01-01T00:00:00Z", - "key" : 1420070400000000000.0, + "key" : 1420070400000.0, "doc_count" : 4 } ] @@ -407,7 +407,7 @@ mod tests { "buckets" : [ { "key_as_string" : "2015-01-01T00:00:00Z", - "key" : 1420070400000000000.0, + "key" : 1420070400000.0, "doc_count" : 4, "texts": { "buckets": [ @@ -456,32 +456,32 @@ mod tests { "buckets": [ { "doc_count": 2, - "key": 1420070400000000000.0, + "key": 1420070400000.0, "key_as_string": "2015-01-01T00:00:00Z" }, { "doc_count": 1, - "key": 1420156800000000000.0, + "key": 1420156800000.0, "key_as_string": "2015-01-02T00:00:00Z" }, { "doc_count": 0, - "key": 1420243200000000000.0, + "key": 1420243200000.0, "key_as_string": "2015-01-03T00:00:00Z" }, { "doc_count": 0, - "key": 1420329600000000000.0, + "key": 1420329600000.0, "key_as_string": "2015-01-04T00:00:00Z" }, { "doc_count": 0, - "key": 1420416000000000000.0, + "key": 1420416000000.0, "key_as_string": "2015-01-05T00:00:00Z" }, { "doc_count": 1, - "key": 1420502400000000000.0, + "key": 1420502400000.0, "key_as_string": "2015-01-06T00:00:00Z" } ] @@ -499,8 +499,8 @@ mod tests { "field": "date", "fixed_interval": "1d", "extended_bounds": { - "min": 1419984000000000000.0, - "max": 1420588800000000000.0 + "min": 1419984000000.0, + "max": 1420588800000.0 } } } @@ -517,42 +517,42 @@ mod tests { "buckets": [ { "doc_count": 0, - "key": 1419984000000000000.0, + "key": 1419984000000.0, "key_as_string": "2014-12-31T00:00:00Z" }, { "doc_count": 2, - "key": 1420070400000000000.0, + "key": 1420070400000.0, "key_as_string": "2015-01-01T00:00:00Z" }, { "doc_count": 1, - "key": 1420156800000000000.0, + "key": 1420156800000.0, "key_as_string": "2015-01-02T00:00:00Z" }, { "doc_count": 0, - "key": 1420243200000000000.0, + "key": 1420243200000.0, "key_as_string": "2015-01-03T00:00:00Z" }, { "doc_count": 0, - "key": 1420329600000000000.0, + "key": 1420329600000.0, "key_as_string": "2015-01-04T00:00:00Z" }, { "doc_count": 0, - "key": 1420416000000000000.0, + "key": 1420416000000.0, "key_as_string": "2015-01-05T00:00:00Z" }, { "doc_count": 1, - "key": 1420502400000000000.0, + "key": 1420502400000.0, "key_as_string": "2015-01-06T00:00:00Z" }, { "doc_count": 0, - "key": 1.4205888e18, + "key": 1420588800000.0, "key_as_string": "2015-01-07T00:00:00Z" } ] @@ -569,8 +569,8 @@ mod tests { "field": "date", "fixed_interval": "1d", "hard_bounds": { - "min": 1420156800000000000.0, - "max": 1420243200000000000.0 + "min": 1420156800000.0, + "max": 1420243200000.0 } } } @@ -587,7 +587,7 @@ mod tests { "buckets": [ { "doc_count": 1, - "key": 1420156800000000000.0, + "key": 1420156800000.0, "key_as_string": "2015-01-02T00:00:00Z" } ] diff --git a/src/aggregation/bucket/histogram/histogram.rs b/src/aggregation/bucket/histogram/histogram.rs index cb35d1da3d..cf01f6cf4c 100644 --- a/src/aggregation/bucket/histogram/histogram.rs +++ b/src/aggregation/bucket/histogram/histogram.rs @@ -13,6 +13,8 @@ use crate::aggregation::agg_req_with_accessor::{ AggregationWithAccessor, AggregationsWithAccessor, }; use crate::aggregation::agg_result::BucketEntry; +use crate::aggregation::date::format_date_ms; +use crate::aggregation::f64_from_fastfield_u64; use crate::aggregation::intermediate_agg_result::{ IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult, IntermediateHistogramBucketEntry, @@ -20,7 +22,6 @@ use crate::aggregation::intermediate_agg_result::{ use crate::aggregation::segment_agg_result::{ build_segment_agg_collector, AggregationLimits, SegmentAggregationCollector, }; -use crate::aggregation::{f64_from_fastfield_u64, format_date}; use crate::TantivyError; /// Histogram is a bucket aggregation, where buckets are created dynamically for given `interval`. @@ -125,6 +126,22 @@ pub struct HistogramAggregation { } impl HistogramAggregation { + pub(crate) fn normalize(&mut self, column_type: ColumnType) { + if column_type.is_date_time() { + // values are provided in ms, but the fastfield is in nano seconds + self.interval *= 1_000_000.0; + self.offset = self.offset.map(|off| off * 1_000_000.0); + self.hard_bounds = self.hard_bounds.map(|bounds| HistogramBounds { + min: bounds.min * 1_000_000.0, + max: bounds.max * 1_000_000.0, + }); + self.extended_bounds = self.extended_bounds.map(|bounds| HistogramBounds { + min: bounds.min * 1_000_000.0, + max: bounds.max * 1_000_000.0, + }); + } + } + fn validate(&self) -> crate::Result<()> { if self.interval <= 0.0f64 { return Err(TantivyError::InvalidArgument( @@ -187,12 +204,14 @@ pub(crate) struct SegmentHistogramBucketEntry { impl SegmentHistogramBucketEntry { pub(crate) fn into_intermediate_bucket_entry( self, - sub_aggregation: Box, + sub_aggregation: Option>, agg_with_accessor: &AggregationsWithAccessor, ) -> crate::Result { let mut sub_aggregation_res = IntermediateAggregationResults::default(); - sub_aggregation - .add_intermediate_aggregation_result(agg_with_accessor, &mut sub_aggregation_res)?; + if let Some(sub_aggregation) = sub_aggregation { + sub_aggregation + .add_intermediate_aggregation_result(agg_with_accessor, &mut sub_aggregation_res)?; + } Ok(IntermediateHistogramBucketEntry { key: self.key, doc_count: self.doc_count, @@ -312,19 +331,15 @@ impl SegmentHistogramCollector { ) -> crate::Result { let mut buckets = Vec::with_capacity(self.buckets.len()); - if self.sub_aggregation_blueprint.is_some() { - for (bucket_pos, bucket) in self.buckets.into_iter() { - let bucket_res = bucket.into_intermediate_bucket_entry( - self.sub_aggregations.get(&bucket_pos).unwrap().clone(), - &agg_with_accessor.sub_aggregation, - ); + for (bucket_pos, bucket) in self.buckets.into_iter() { + let bucket_res = bucket.into_intermediate_bucket_entry( + self.sub_aggregations.get(&bucket_pos).cloned(), + &agg_with_accessor.sub_aggregation, + ); - buckets.push(bucket_res?); - } - } else { - buckets.extend(self.buckets.into_values().map(|bucket| bucket.into())); - }; - buckets.sort_unstable_by(|b1, b2| b1.key.partial_cmp(&b2.key).unwrap_or(Ordering::Equal)); + buckets.push(bucket_res?); + } + buckets.sort_unstable_by(|b1, b2| b1.key.total_cmp(&b2.key)); Ok(IntermediateBucketResult::Histogram { buckets, @@ -333,12 +348,13 @@ impl SegmentHistogramCollector { } pub(crate) fn from_req_and_validate( - req: &HistogramAggregation, + mut req: HistogramAggregation, sub_aggregation: &mut AggregationsWithAccessor, field_type: ColumnType, accessor_idx: usize, ) -> crate::Result { req.validate()?; + req.normalize(field_type); let sub_aggregation_blueprint = if sub_aggregation.is_empty() { None @@ -396,11 +412,11 @@ fn intermediate_buckets_to_final_buckets_fill_gaps( // memory check upfront let (_, first_bucket_num, last_bucket_num) = generate_bucket_pos_with_opt_minmax(histogram_req, min_max); - let added_buckets = (first_bucket_num..=last_bucket_num) - .count() - .saturating_sub(buckets.len()); + // It's based user input, so we need to account for overflows + let added_buckets = ((last_bucket_num.saturating_sub(first_bucket_num)).max(0) as u64) + .saturating_sub(buckets.len() as u64); limits.add_memory_consumed( - added_buckets as u64 * std::mem::size_of::() as u64, + added_buckets * std::mem::size_of::() as u64, )?; // create buckets let fill_gaps_buckets = generate_buckets_with_opt_minmax(histogram_req, min_max); @@ -409,7 +425,7 @@ fn intermediate_buckets_to_final_buckets_fill_gaps( // Use merge_join_by to fill in gaps, since buckets are sorted - buckets + let final_buckets: Vec = buckets .into_iter() .merge_join_by( fill_gaps_buckets.into_iter(), @@ -434,7 +450,9 @@ fn intermediate_buckets_to_final_buckets_fill_gaps( .map(|intermediate_bucket| { intermediate_bucket.into_final_bucket_entry(sub_aggregation, limits) }) - .collect::>>() + .collect::>>()?; + + Ok(final_buckets) } // Convert to BucketEntry @@ -445,14 +463,20 @@ pub(crate) fn intermediate_histogram_buckets_to_final_buckets( sub_aggregation: &Aggregations, limits: &AggregationLimits, ) -> crate::Result> { + // Normalization is column type dependent. + // The request used in the the call to final is not yet be normalized. + // Normalization is changing the precision from milliseconds to nanoseconds. + let mut histogram_req = histogram_req.clone(); + if let Some(column_type) = column_type { + histogram_req.normalize(column_type); + } let mut buckets = if histogram_req.min_doc_count() == 0 { // With min_doc_count != 0, we may need to add buckets, so that there are no // gaps, since intermediate result does not contain empty buckets (filtered to // reduce serialization size). - intermediate_buckets_to_final_buckets_fill_gaps( buckets, - histogram_req, + &histogram_req, sub_aggregation, limits, )? @@ -467,10 +491,12 @@ pub(crate) fn intermediate_histogram_buckets_to_final_buckets( }; // If we have a date type on the histogram buckets, we add the `key_as_string` field as rfc339 + // and normalize from nanoseconds to milliseconds if column_type == Some(ColumnType::DateTime) { for bucket in buckets.iter_mut() { - if let crate::aggregation::Key::F64(val) = bucket.key { - let key_as_string = format_date(val as i64)?; + if let crate::aggregation::Key::F64(ref mut val) = bucket.key { + *val /= 1_000_000.0; + let key_as_string = format_date_ms(*val as i64)?; bucket.key_as_string = Some(key_as_string); } } @@ -1203,7 +1229,7 @@ mod tests { "histogram": { "histogram": { "field": "date", - "interval": 86400000000000.0, // one day in nano seconds + "interval": 86400000.0, // one day in milliseconds seconds }, } })) @@ -1213,14 +1239,14 @@ mod tests { let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?; - assert_eq!(res["histogram"]["buckets"][0]["key"], 1546300800000000000.0); + assert_eq!(res["histogram"]["buckets"][0]["key"], 1546300800000.0); assert_eq!( res["histogram"]["buckets"][0]["key_as_string"], "2019-01-01T00:00:00Z" ); assert_eq!(res["histogram"]["buckets"][0]["doc_count"], 1); - assert_eq!(res["histogram"]["buckets"][1]["key"], 1546387200000000000.0); + assert_eq!(res["histogram"]["buckets"][1]["key"], 1546387200000.0); assert_eq!( res["histogram"]["buckets"][1]["key_as_string"], "2019-01-02T00:00:00Z" @@ -1228,7 +1254,7 @@ mod tests { assert_eq!(res["histogram"]["buckets"][1]["doc_count"], 5); - assert_eq!(res["histogram"]["buckets"][2]["key"], 1546473600000000000.0); + assert_eq!(res["histogram"]["buckets"][2]["key"], 1546473600000.0); assert_eq!( res["histogram"]["buckets"][2]["key_as_string"], "2019-01-03T00:00:00Z" diff --git a/src/aggregation/date.rs b/src/aggregation/date.rs index 97befe7b9e..8ec47f3245 100644 --- a/src/aggregation/date.rs +++ b/src/aggregation/date.rs @@ -14,3 +14,16 @@ pub(crate) fn format_date(val: i64) -> crate::Result { .map_err(|_err| TantivyError::InvalidArgument("Could not serialize date".to_string()))?; Ok(key_as_string) } + +pub(crate) fn format_date_ms(val: i64) -> crate::Result { + let datetime = + OffsetDateTime::from_unix_timestamp_nanos(val as i128 * 1_000_000).map_err(|err| { + TantivyError::InvalidArgument(format!( + "Could not convert {val:?} to OffsetDateTime, err {err:?}" + )) + })?; + let key_as_string = datetime + .format(&Rfc3339) + .map_err(|_err| TantivyError::InvalidArgument("Could not serialize date".to_string()))?; + Ok(key_as_string) +} diff --git a/src/aggregation/intermediate_agg_result.rs b/src/aggregation/intermediate_agg_result.rs index d3fd9e7ca6..195cd52338 100644 --- a/src/aggregation/intermediate_agg_result.rs +++ b/src/aggregation/intermediate_agg_result.rs @@ -15,8 +15,7 @@ use super::agg_req::{Aggregation, AggregationVariants, Aggregations}; use super::agg_result::{AggregationResult, BucketResult, MetricResult, RangeBucketEntry}; use super::bucket::{ cut_off_buckets, get_agg_name_and_property, intermediate_histogram_buckets_to_final_buckets, - GetDocCount, Order, OrderTarget, RangeAggregation, SegmentHistogramBucketEntry, - TermsAggregation, + GetDocCount, Order, OrderTarget, RangeAggregation, TermsAggregation, }; use super::metric::{ IntermediateAverage, IntermediateCount, IntermediateMax, IntermediateMin, IntermediateStats, @@ -646,16 +645,6 @@ impl IntermediateHistogramBucketEntry { } } -impl From for IntermediateHistogramBucketEntry { - fn from(entry: SegmentHistogramBucketEntry) -> Self { - IntermediateHistogramBucketEntry { - key: entry.key, - doc_count: entry.doc_count, - sub_aggregation: Default::default(), - } - } -} - /// This is the range entry for a bucket, which contains a key, count, and optionally /// sub_aggregations. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] diff --git a/src/aggregation/segment_agg_result.rs b/src/aggregation/segment_agg_result.rs index 2c5e604f65..da03ccaaa4 100644 --- a/src/aggregation/segment_agg_result.rs +++ b/src/aggregation/segment_agg_result.rs @@ -109,13 +109,13 @@ pub(crate) fn build_single_agg_segment_collector( accessor_idx, )?)), Histogram(histogram) => Ok(Box::new(SegmentHistogramCollector::from_req_and_validate( - histogram, + histogram.clone(), &mut req.sub_aggregation, req.field_type, accessor_idx, )?)), DateHistogram(histogram) => Ok(Box::new(SegmentHistogramCollector::from_req_and_validate( - &histogram.to_histogram_req()?, + histogram.to_histogram_req()?, &mut req.sub_aggregation, req.field_type, accessor_idx, From 22c3aa4a7affb1eca9bfd6c4ba73e8aee990ac70 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Tue, 16 May 2023 15:50:32 +0800 Subject: [PATCH 2/8] add missing unit long variants --- src/aggregation/bucket/histogram/date_histogram.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/aggregation/bucket/histogram/date_histogram.rs b/src/aggregation/bucket/histogram/date_histogram.rs index d745236a03..8d41dd80d1 100644 --- a/src/aggregation/bucket/histogram/date_histogram.rs +++ b/src/aggregation/bucket/histogram/date_histogram.rs @@ -219,15 +219,16 @@ fn parse_into_milliseconds(input: &str) -> Result { .map_err(|_err| DateHistogramParseError::NumberMissing(input.to_string()))?; let unit_in_ms = match unit { - "ms" => 1, - "s" => 1000, - "m" => 60 * 1000, - "h" => 60 * 60 * 1000, - "d" => 24 * 60 * 60 * 1000, + "ms" | "milliseconds" => 1, + "s" | "seconds" => 1000, + "m" | "minutes" => 60 * 1000, + "h" | "hours" => 60 * 60 * 1000, + "d" | "days" => 24 * 60 * 60 * 1000, _ => return Err(DateHistogramParseError::UnitNotRecognized(unit.to_string()).into()), }; let val = number * unit_in_ms; + // The field type is in nanoseconds precision, so validate the value to fit the range val.checked_mul(1_000_000) .ok_or_else(|| DateHistogramParseError::OutOfBounds(input.to_string()))?; @@ -249,6 +250,7 @@ mod tests { fn test_parse_into_millisecs() { assert_eq!(parse_into_milliseconds("1m").unwrap(), 60_000); assert_eq!(parse_into_milliseconds("2m").unwrap(), 120_000); + assert_eq!(parse_into_milliseconds("2minutes").unwrap(), 120_000); assert_eq!( parse_into_milliseconds("2y").unwrap_err(), DateHistogramParseError::UnitNotRecognized("y".to_string()).into() From ff7bf3082debe0fddf34c58803e5f2f401e27c6b Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Tue, 16 May 2023 16:31:52 +0800 Subject: [PATCH 3/8] use single thread to avoid handling test case --- .github/workflows/coverage.yml | 2 +- .github/workflows/test.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index fb6a1feb36..72a3fa0874 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -16,7 +16,7 @@ jobs: - uses: Swatinem/rust-cache@v2 - uses: taiki-e/install-action@cargo-llvm-cov - name: Generate code coverage - run: cargo +nightly llvm-cov --all-features --workspace --doctests --lcov --output-path lcov.info + run: RUST_TEST_THREADS=1 cargo +nightly llvm-cov --all-features --workspace --doctests --lcov --output-path lcov.info - name: Upload coverage to Codecov uses: codecov/codecov-action@v3 continue-on-error: true diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 62b7065151..772ca621bb 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -68,7 +68,7 @@ jobs: - uses: Swatinem/rust-cache@v2 - name: Run tests - run: cargo +stable nextest run --features ${{ matrix.features.flags }} --verbose --workspace + run: RUST_TEST_THREADS=1 cargo +stable nextest run --features ${{ matrix.features.flags }} --verbose --workspace - name: Run doctests run: cargo +stable test --doc --features ${{ matrix.features.flags }} --verbose --workspace From d2d71dc96ae7bdd97d5e44b4463d1a93d8eb87f4 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Tue, 16 May 2023 17:42:38 +0800 Subject: [PATCH 4/8] fix docs --- src/aggregation/agg_req.rs | 2 +- src/aggregation/bucket/mod.rs | 3 +-- src/aggregation/metric/percentiles.rs | 6 +++--- src/aggregation/mod.rs | 7 ++++--- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/aggregation/agg_req.rs b/src/aggregation/agg_req.rs index 1aa7fa6a5e..d8d30f6ac2 100644 --- a/src/aggregation/agg_req.rs +++ b/src/aggregation/agg_req.rs @@ -39,7 +39,7 @@ use super::metric::{ }; /// The top-level aggregation request structure, which contains [`Aggregation`] and their user -/// defined names. It is also used in [buckets](BucketAggregation) to define sub-aggregations. +/// defined names. It is also used in buckets aggregations to define sub-aggregations. /// /// The key is the user defined name of the aggregation. pub type Aggregations = HashMap; diff --git a/src/aggregation/bucket/mod.rs b/src/aggregation/bucket/mod.rs index 3ccc53e974..711b6c2f3c 100644 --- a/src/aggregation/bucket/mod.rs +++ b/src/aggregation/bucket/mod.rs @@ -1,7 +1,6 @@ //! Module for all bucket aggregations. //! -//! BucketAggregations create buckets of documents -//! [`BucketAggregation`](super::agg_req::BucketAggregation). +//! BucketAggregations create buckets of documents. //! Each bucket is associated with a rule which //! determines whether or not a document in the falls into it. In other words, the buckets //! effectively define document sets. Buckets are not necessarily disjunct, therefore a document can diff --git a/src/aggregation/metric/percentiles.rs b/src/aggregation/metric/percentiles.rs index 2326054c03..4ab6f31756 100644 --- a/src/aggregation/metric/percentiles.rs +++ b/src/aggregation/metric/percentiles.rs @@ -34,7 +34,7 @@ use crate::{DocId, TantivyError}; /// following example demonstrates a request for the percentiles of the "load_time" /// field: /// -/// ```json +/// ```JSON /// { /// "percentiles": { /// "field": "load_time" @@ -46,7 +46,7 @@ use crate::{DocId, TantivyError}; /// 25, 50 (median), 75, 95, and 99). You can also customize the percentiles you want to /// calculate by providing an array of values in the "percents" parameter: /// -/// ```json +/// ```JSON /// { /// "percentiles": { /// "field": "load_time", @@ -90,7 +90,7 @@ fn default_as_true() -> bool { } impl PercentilesAggregationReq { - /// Creates a new [`PercentilesAggregation`] instance from a field name. + /// Creates a new [`PercentilesAggregationReq`] instance from a field name. pub fn from_field_name(field_name: String) -> Self { PercentilesAggregationReq { field: field_name, diff --git a/src/aggregation/mod.rs b/src/aggregation/mod.rs index 7a7fc0ef42..93116e0d9c 100644 --- a/src/aggregation/mod.rs +++ b/src/aggregation/mod.rs @@ -25,8 +25,7 @@ //! Aggregations request and result structures de/serialize into elasticsearch compatible JSON. //! //! Notice: Intermediate aggregation results should not be de/serialized via JSON format. -//! See compatibility tests here: https://github.com/PSeitz/test_serde_formats -//! TLDR: use ciborium. +//! Postcard is a good choice. //! //! ```verbatim //! let agg_req: Aggregations = serde_json::from_str(json_request_string).unwrap(); @@ -39,6 +38,7 @@ //! ## Supported Aggregations //! - [Bucket](bucket) //! - [Histogram](bucket::HistogramAggregation) +//! - [DateHistogram](bucket::DateHistogramAggregationReq) //! - [Range](bucket::RangeAggregation) //! - [Terms](bucket::TermsAggregation) //! - [Metric](metric) @@ -48,6 +48,7 @@ //! - [Max](metric::MaxAggregation) //! - [Sum](metric::SumAggregation) //! - [Count](metric::CountAggregation) +//! - [Percentiles](metric::PercentilesAggregationReq) //! //! # Example //! Compute the average metric, by building [`agg_req::Aggregations`], which is built from an @@ -121,7 +122,7 @@ //! [`merge_fruits`](intermediate_agg_result::IntermediateAggregationResults::merge_fruits) method //! to merge multiple results. The merged result can then be converted into //! [`AggregationResults`](agg_result::AggregationResults) via the -//! [`into_final_bucket_result`](intermediate_agg_result::IntermediateAggregationResults::into_final_bucket_result) method. +//! [`into_final_result`](intermediate_agg_result::IntermediateAggregationResults::into_final_result) method. mod agg_limits; pub mod agg_req; From 6688d43070d4dcfa707c860522d785dcd4ec7ce2 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Tue, 16 May 2023 18:31:52 +0800 Subject: [PATCH 5/8] revert CI --- .github/workflows/coverage.yml | 2 +- .github/workflows/test.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 72a3fa0874..fb6a1feb36 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -16,7 +16,7 @@ jobs: - uses: Swatinem/rust-cache@v2 - uses: taiki-e/install-action@cargo-llvm-cov - name: Generate code coverage - run: RUST_TEST_THREADS=1 cargo +nightly llvm-cov --all-features --workspace --doctests --lcov --output-path lcov.info + run: cargo +nightly llvm-cov --all-features --workspace --doctests --lcov --output-path lcov.info - name: Upload coverage to Codecov uses: codecov/codecov-action@v3 continue-on-error: true diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 772ca621bb..62b7065151 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -68,7 +68,7 @@ jobs: - uses: Swatinem/rust-cache@v2 - name: Run tests - run: RUST_TEST_THREADS=1 cargo +stable nextest run --features ${{ matrix.features.flags }} --verbose --workspace + run: cargo +stable nextest run --features ${{ matrix.features.flags }} --verbose --workspace - name: Run doctests run: cargo +stable test --doc --features ${{ matrix.features.flags }} --verbose --workspace From 63aba597d53541edc507c9561e50261b8c7238f0 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Tue, 16 May 2023 19:40:14 +0800 Subject: [PATCH 6/8] cleanup --- src/aggregation/bucket/histogram/date_histogram.rs | 2 +- src/aggregation/bucket/histogram/histogram.rs | 7 +++---- src/aggregation/date.rs | 13 ------------- 3 files changed, 4 insertions(+), 18 deletions(-) diff --git a/src/aggregation/bucket/histogram/date_histogram.rs b/src/aggregation/bucket/histogram/date_histogram.rs index 8d41dd80d1..c316eed83c 100644 --- a/src/aggregation/bucket/histogram/date_histogram.rs +++ b/src/aggregation/bucket/histogram/date_histogram.rs @@ -77,7 +77,7 @@ pub struct DateHistogramAggregationReq { /// hard_bounds only limits the buckets, to force a range set both extended_bounds and /// hard_bounds to the same range. /// - /// Needs to be provided as timestamp in milliseconds precision. + /// Needs to be provided as timestamp in millisecond precision. /// /// ## Example /// ```json diff --git a/src/aggregation/bucket/histogram/histogram.rs b/src/aggregation/bucket/histogram/histogram.rs index cf01f6cf4c..097190a1ec 100644 --- a/src/aggregation/bucket/histogram/histogram.rs +++ b/src/aggregation/bucket/histogram/histogram.rs @@ -13,8 +13,6 @@ use crate::aggregation::agg_req_with_accessor::{ AggregationWithAccessor, AggregationsWithAccessor, }; use crate::aggregation::agg_result::BucketEntry; -use crate::aggregation::date::format_date_ms; -use crate::aggregation::f64_from_fastfield_u64; use crate::aggregation::intermediate_agg_result::{ IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult, IntermediateHistogramBucketEntry, @@ -22,6 +20,7 @@ use crate::aggregation::intermediate_agg_result::{ use crate::aggregation::segment_agg_result::{ build_segment_agg_collector, AggregationLimits, SegmentAggregationCollector, }; +use crate::aggregation::{f64_from_fastfield_u64, format_date}; use crate::TantivyError; /// Histogram is a bucket aggregation, where buckets are created dynamically for given `interval`. @@ -412,7 +411,7 @@ fn intermediate_buckets_to_final_buckets_fill_gaps( // memory check upfront let (_, first_bucket_num, last_bucket_num) = generate_bucket_pos_with_opt_minmax(histogram_req, min_max); - // It's based user input, so we need to account for overflows + // It's based on user input, so we need to account for overflows let added_buckets = ((last_bucket_num.saturating_sub(first_bucket_num)).max(0) as u64) .saturating_sub(buckets.len() as u64); limits.add_memory_consumed( @@ -495,8 +494,8 @@ pub(crate) fn intermediate_histogram_buckets_to_final_buckets( if column_type == Some(ColumnType::DateTime) { for bucket in buckets.iter_mut() { if let crate::aggregation::Key::F64(ref mut val) = bucket.key { + let key_as_string = format_date(*val as i64)?; *val /= 1_000_000.0; - let key_as_string = format_date_ms(*val as i64)?; bucket.key_as_string = Some(key_as_string); } } diff --git a/src/aggregation/date.rs b/src/aggregation/date.rs index 8ec47f3245..97befe7b9e 100644 --- a/src/aggregation/date.rs +++ b/src/aggregation/date.rs @@ -14,16 +14,3 @@ pub(crate) fn format_date(val: i64) -> crate::Result { .map_err(|_err| TantivyError::InvalidArgument("Could not serialize date".to_string()))?; Ok(key_as_string) } - -pub(crate) fn format_date_ms(val: i64) -> crate::Result { - let datetime = - OffsetDateTime::from_unix_timestamp_nanos(val as i128 * 1_000_000).map_err(|err| { - TantivyError::InvalidArgument(format!( - "Could not convert {val:?} to OffsetDateTime, err {err:?}" - )) - })?; - let key_as_string = datetime - .format(&Rfc3339) - .map_err(|_err| TantivyError::InvalidArgument("Could not serialize date".to_string()))?; - Ok(key_as_string) -} From bdf0a0b617413786a72c7f3b75ec1be9aa8c5446 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Wed, 17 May 2023 21:11:45 +0800 Subject: [PATCH 7/8] improve docs --- src/aggregation/bucket/histogram/date_histogram.rs | 7 +++++++ src/aggregation/metric/percentiles.rs | 11 +++++------ 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/aggregation/bucket/histogram/date_histogram.rs b/src/aggregation/bucket/histogram/date_histogram.rs index c316eed83c..aa024177c4 100644 --- a/src/aggregation/bucket/histogram/date_histogram.rs +++ b/src/aggregation/bucket/histogram/date_histogram.rs @@ -67,6 +67,13 @@ pub struct DateHistogramAggregationReq { pub fixed_interval: Option, /// Intervals implicitly defines an absolute grid of buckets `[interval * k, interval * (k + /// 1))`. + /// + /// Offset makes it possible to shift this grid into + /// `[offset + interval * k, offset + interval * (k + 1))`. Offset has to be in the range [0, + /// interval). + /// + /// The `offset` parameter is has the same syntax as the `fixed_interval` parameter, but + /// also allows for negative values. pub offset: Option, /// The minimum number of documents in a bucket to be returned. Defaults to 0. pub min_doc_count: Option, diff --git a/src/aggregation/metric/percentiles.rs b/src/aggregation/metric/percentiles.rs index 4ab6f31756..cc3aff1ea4 100644 --- a/src/aggregation/metric/percentiles.rs +++ b/src/aggregation/metric/percentiles.rs @@ -21,17 +21,16 @@ use crate::{DocId, TantivyError}; /// data falls. For instance, the 95th percentile indicates the value below which /// 95% of the data points can be found. /// -/// This aggregation can be particularly interesting for analyzing website load -/// times. By computing the percentiles of load times, you can get insights into -/// how quickly your website loads for different users and identify areas where -/// improvements can be made. +/// This aggregation can be particularly interesting for analyzing website or service response +/// times. For example, if the 95th percentile website load time is significantly higher than the +/// median, this indicates that a small percentage of users are experiencing much slower load times +/// than the majority. /// /// To use the percentiles aggregation, you'll need to provide a field to /// aggregate on. In the case of website load times, this would typically be a /// field containing the duration of time it takes for the site to load. /// -/// The JSON format for a percentiles aggregation request is straightforward. The -/// following example demonstrates a request for the percentiles of the "load_time" +/// The following example demonstrates a request for the percentiles of the "load_time" /// field: /// /// ```JSON From ae94bf23b54cce88b03a6cbff5da8cb2e883a659 Mon Sep 17 00:00:00 2001 From: PSeitz Date: Fri, 19 May 2023 06:00:59 +0200 Subject: [PATCH 8/8] Update src/aggregation/bucket/histogram/histogram.rs Co-authored-by: Paul Masurel --- src/aggregation/bucket/histogram/histogram.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/aggregation/bucket/histogram/histogram.rs b/src/aggregation/bucket/histogram/histogram.rs index 097190a1ec..678a8e1e6d 100644 --- a/src/aggregation/bucket/histogram/histogram.rs +++ b/src/aggregation/bucket/histogram/histogram.rs @@ -330,7 +330,7 @@ impl SegmentHistogramCollector { ) -> crate::Result { let mut buckets = Vec::with_capacity(self.buckets.len()); - for (bucket_pos, bucket) in self.buckets.into_iter() { + for (bucket_pos, bucket) in self.buckets { let bucket_res = bucket.into_intermediate_bucket_entry( self.sub_aggregations.get(&bucket_pos).cloned(), &agg_with_accessor.sub_aggregation,