From f569223aef98f42d59ec755d43be0248e9290fd9 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Mon, 15 Jan 2024 12:34:59 +0100 Subject: [PATCH] Primary caching 9: timeless latest-at support (#4721) Introduces a dedicated cache bucket for timeless data and properly forwards the information through all APIs downstream. --- Part of the primary caching series of PR (index search, joins, deserialization): - #4592 - #4593 - #4659 - #4680 - #4681 - #4698 - #4711 - #4712 - #4721 - #4726 - #4773 - #4784 - #4785 - #4793 - #4800 --- .../re_log_types/src/time_point/time_int.rs | 5 - crates/re_query_cache/src/cache.rs | 10 +- crates/re_query_cache/src/query.rs | 122 +++++++++------ .../src/visualizers/entity_iterator.rs | 2 +- .../src/visualizer_system.rs | 12 +- .../src/visualizer_system.rs | 146 ++++++++++-------- 6 files changed, 169 insertions(+), 128 deletions(-) diff --git a/crates/re_log_types/src/time_point/time_int.rs b/crates/re_log_types/src/time_point/time_int.rs index c7ab759b4c69..11ceb1b3e465 100644 --- a/crates/re_log_types/src/time_point/time_int.rs +++ b/crates/re_log_types/src/time_point/time_int.rs @@ -68,11 +68,6 @@ impl TimeInt { pub fn abs(&self) -> Self { Self(self.0.saturating_abs()) } - - #[inline] - pub fn is_timeless(&self) -> bool { - self == &Self::BEGINNING - } } impl From for TimeInt { diff --git a/crates/re_query_cache/src/cache.rs b/crates/re_query_cache/src/cache.rs index cefaf20e362d..7daba9d8260c 100644 --- a/crates/re_query_cache/src/cache.rs +++ b/crates/re_query_cache/src/cache.rs @@ -49,7 +49,6 @@ static CACHES: Lazy = Lazy::new(Caches::default); // // TODO(cmc): Store subscriber and cache invalidation. // TODO(#4730): SizeBytes support + size stats + mem panel -// TODO(cmc): timeless caching support #[derive(Default)] pub struct Caches { latest_at: RwLock>>>, @@ -351,4 +350,13 @@ pub struct LatestAtCache { /// Due to how our latest-at semantics work, any number of queries at time `T+n` where `n >= 0` /// can result in a data time of `T`. pub per_data_time: BTreeMap>>, + + /// Dedicated bucket for timeless data, if any. + /// + /// Query time and data time are one and the same in the timeless case, therefore we only need + /// this one bucket. + // + // NOTE: Lives separately so we don't pay the extra `Option` cost in the much more common + // timeful case. + pub timeless: Option, } diff --git a/crates/re_query_cache/src/query.rs b/crates/re_query_cache/src/query.rs index 077e81f381aa..9fc068596df3 100644 --- a/crates/re_query_cache/src/query.rs +++ b/crates/re_query_cache/src/query.rs @@ -66,7 +66,7 @@ where R1: Component + Send + Sync + 'static, F: FnMut( ( - (TimeInt, RowId), + (Option, RowId), MaybeCachedComponentData<'_, InstanceKey>, MaybeCachedComponentData<'_, R1>, ), @@ -93,7 +93,7 @@ macro_rules! impl_query_archetype { $($comp: Component + Send + Sync + 'static,)* F: FnMut( ( - (TimeInt, RowId), + (Option, RowId), MaybeCachedComponentData<'_, InstanceKey>, $(MaybeCachedComponentData<'_, $pov>,)+ $(MaybeCachedComponentData<'_, Option<$comp>>,)* @@ -107,7 +107,7 @@ macro_rules! impl_query_archetype { ); - let mut iter_results = |bucket: &crate::CacheBucket| -> crate::Result<()> { + let mut iter_results = |timeless: bool, bucket: &crate::CacheBucket| -> crate::Result<()> { re_tracing::profile_scope!("iter"); let it = itertools::izip!( @@ -117,9 +117,9 @@ macro_rules! impl_query_archetype { .ok_or_else(|| re_query::ComponentNotFoundError(<$pov>::name()))?,)+ $(bucket.iter_component_opt::<$comp>() .ok_or_else(|| re_query::ComponentNotFoundError(<$comp>::name()))?,)* - ).map(|(time, instance_keys, $($pov,)+ $($comp,)*)| { + ).map(|((time, row_id), instance_keys, $($pov,)+ $($comp,)*)| { ( - *time, + ((!timeless).then_some(*time), *row_id), MaybeCachedComponentData::Cached(instance_keys), $(MaybeCachedComponentData::Cached($pov),)+ $(MaybeCachedComponentData::Cached($comp),)* @@ -133,68 +133,102 @@ macro_rules! impl_query_archetype { Ok(()) }; + + let upsert_results = | + data_time: TimeInt, + arch_view: &::re_query::ArchetypeView, + bucket: &mut crate::CacheBucket, + | -> crate::Result<()> { + re_log::trace!(data_time=?data_time, ?data_time, "fill"); + + // Grabbing the current time is quite costly on web. + #[cfg(not(target_arch = "wasm32"))] + let now = web_time::Instant::now(); + + bucket.[]::(data_time, &arch_view)?; + + #[cfg(not(target_arch = "wasm32"))] + { + let elapsed = now.elapsed(); + ::re_log::trace!( + store_id=%store.id(), + %entity_path, + archetype=%A::name(), + "cached new entry in {elapsed:?} ({:0.3} entries/s)", + 1f64 / elapsed.as_secs_f64() + ); + } + + Ok(()) + }; + let mut latest_at_callback = |query: &LatestAtQuery, latest_at_cache: &mut crate::LatestAtCache| { re_tracing::profile_scope!("latest_at", format!("{query:?}")); - let crate::LatestAtCache { per_query_time, per_data_time } = latest_at_cache; + let crate::LatestAtCache { per_query_time, per_data_time, timeless } = latest_at_cache; let query_time_bucket_at_query_time = match per_query_time.entry(query.at) { std::collections::btree_map::Entry::Occupied(query_time_bucket_at_query_time) => { // Fastest path: we have an entry for this exact query time, no need to look any // further. - return iter_results(&query_time_bucket_at_query_time.get().read()); + re_log::trace!(query_time=?query.at, "cache hit (query time)"); + return iter_results(false, &query_time_bucket_at_query_time.get().read()); } entry @ std::collections::btree_map::Entry::Vacant(_) => entry, }; let arch_view = query_archetype::(store, &query, entity_path)?; - // TODO(cmc): actual timeless caching support. - let data_time = arch_view.data_time().unwrap_or(TimeInt::MIN); + let data_time = arch_view.data_time(); // Fast path: we've run the query and realized that we already have the data for the resulting // _data_ time, so let's use that to avoid join & deserialization costs. - if let Some(data_time_bucket_at_data_time) = per_data_time.get(&data_time) { - *query_time_bucket_at_query_time.or_default() = std::sync::Arc::clone(&data_time_bucket_at_data_time); + if let Some(data_time) = data_time { // Reminder: `None` means timeless. + if let Some(data_time_bucket_at_data_time) = per_data_time.get(&data_time) { + re_log::trace!(query_time=?query.at, ?data_time, "cache hit (data time)"); + + *query_time_bucket_at_query_time.or_default() = std::sync::Arc::clone(&data_time_bucket_at_data_time); - // We now know for a fact that a query at that data time would yield the same - // results: copy the bucket accordingly so that the next cache hit for that query - // time ends up taking the fastest path. - let query_time_bucket_at_data_time = per_query_time.entry(data_time); - *query_time_bucket_at_data_time.or_default() = std::sync::Arc::clone(&data_time_bucket_at_data_time); + // We now know for a fact that a query at that data time would yield the same + // results: copy the bucket accordingly so that the next cache hit for that query + // time ends up taking the fastest path. + let query_time_bucket_at_data_time = per_query_time.entry(data_time); + *query_time_bucket_at_data_time.or_default() = std::sync::Arc::clone(&data_time_bucket_at_data_time); - return iter_results(&data_time_bucket_at_data_time.read()); + return iter_results(false, &data_time_bucket_at_data_time.read()); + } + } else { + if let Some(timeless_bucket) = timeless.as_ref() { + re_log::trace!(query_time=?query.at, "cache hit (data time, timeless)"); + return iter_results(true, timeless_bucket); + } } let query_time_bucket_at_query_time = query_time_bucket_at_query_time.or_default(); // Slowest path: this is a complete cache miss. - { - re_tracing::profile_scope!("fill"); + if let Some(data_time) = data_time { // Reminder: `None` means timeless. + re_log::trace!(query_time=?query.at, ?data_time, "cache miss"); - // Grabbing the current time is quite costly on web. - #[cfg(not(target_arch = "wasm32"))] - let now = web_time::Instant::now(); - - let mut query_time_bucket_at_query_time = query_time_bucket_at_query_time.write(); - query_time_bucket_at_query_time.[]::(query.at, &arch_view)?; - - #[cfg(not(target_arch = "wasm32"))] { - let elapsed = now.elapsed(); - ::re_log::trace!( - store_id=%store.id(), - %entity_path, - archetype=%A::name(), - "cached new entry in {elapsed:?} ({:0.3} entries/s)", - 1f64 / elapsed.as_secs_f64() - ); + let mut query_time_bucket_at_query_time = query_time_bucket_at_query_time.write(); + upsert_results(data_time, &arch_view, &mut query_time_bucket_at_query_time)?; } - } - let data_time_bucket_at_data_time = per_data_time.entry(data_time); - *data_time_bucket_at_data_time.or_default() = std::sync::Arc::clone(&query_time_bucket_at_query_time); + let data_time_bucket_at_data_time = per_data_time.entry(data_time); + *data_time_bucket_at_data_time.or_default() = std::sync::Arc::clone(&query_time_bucket_at_query_time); + + iter_results(false, &query_time_bucket_at_query_time.read()) + } else { + re_log::trace!(query_time=?query.at, "cache miss (timeless)"); + + let mut timeless_bucket = crate::CacheBucket::default(); - iter_results(&query_time_bucket_at_query_time.read()) + upsert_results(TimeInt::MIN, &arch_view, &mut timeless_bucket)?; + iter_results(true, &timeless_bucket)?; + + *timeless = Some(timeless_bucket); + Ok(()) + } }; @@ -209,8 +243,7 @@ macro_rules! impl_query_archetype { for arch_view in arch_views { let data = ( - // TODO(cmc): actual timeless caching support. - (arch_view.data_time().unwrap_or(TimeInt::MIN), arch_view.primary_row_id()), + (arch_view.data_time(), arch_view.primary_row_id()), MaybeCachedComponentData::Raw(arch_view.iter_instance_keys().collect()), $(MaybeCachedComponentData::Raw(arch_view.iter_required_component::<$pov>()?.collect()),)+ $(MaybeCachedComponentData::Raw(arch_view.iter_optional_component::<$comp>()?.collect()),)* @@ -228,8 +261,7 @@ macro_rules! impl_query_archetype { let arch_view = ::re_query::query_archetype::(store, query, entity_path)?; let data = ( - // TODO(cmc): actual timeless caching support. - (arch_view.data_time().unwrap_or(TimeInt::MIN), arch_view.primary_row_id()), + (arch_view.data_time(), arch_view.primary_row_id()), MaybeCachedComponentData::Raw(arch_view.iter_instance_keys().collect()), $(MaybeCachedComponentData::Raw(arch_view.iter_required_component::<$pov>()?.collect()),)+ $(MaybeCachedComponentData::Raw(arch_view.iter_optional_component::<$comp>()?.collect()),)* @@ -286,7 +318,7 @@ where R1: Component + Send + Sync + 'static, F: FnMut( ( - (TimeInt, RowId), + (Option, RowId), MaybeCachedComponentData<'_, InstanceKey>, MaybeCachedComponentData<'_, R1>, ), @@ -318,7 +350,7 @@ macro_rules! impl_query_archetype_with_history { $($comp: Component + Send + Sync + 'static,)* F: FnMut( ( - (TimeInt, RowId), + (Option, RowId), MaybeCachedComponentData<'_, InstanceKey>, $(MaybeCachedComponentData<'_, $pov>,)+ $(MaybeCachedComponentData<'_, Option<$comp>>,)* diff --git a/crates/re_space_view_spatial/src/visualizers/entity_iterator.rs b/crates/re_space_view_spatial/src/visualizers/entity_iterator.rs index 6fd09ed3e593..b7fdc5f51f0a 100644 --- a/crates/re_space_view_spatial/src/visualizers/entity_iterator.rs +++ b/crates/re_space_view_spatial/src/visualizers/entity_iterator.rs @@ -137,7 +137,7 @@ macro_rules! impl_process_archetype { &EntityPath, &EntityProperties, &SpatialSceneEntityContext<'_>, - (TimeInt, RowId), + (Option, RowId), &[InstanceKey], $(&[$pov],)* $(&[Option<$comp>],)* diff --git a/crates/re_space_view_text_log/src/visualizer_system.rs b/crates/re_space_view_text_log/src/visualizer_system.rs index a93847ca593d..fd63d9460da9 100644 --- a/crates/re_space_view_text_log/src/visualizer_system.rs +++ b/crates/re_space_view_text_log/src/visualizer_system.rs @@ -1,6 +1,6 @@ use re_data_store::TimeRange; use re_entity_db::EntityPath; -use re_log_types::{RowId, TimeInt}; +use re_log_types::RowId; use re_types::{ archetypes::TextLog, components::{Color, Text, TextLogLevel}, @@ -61,6 +61,8 @@ impl VisualizerSystem for TextLogSystem { let store = ctx.entity_db.store(); for data_result in query.iter_visible_data_results(Self::identifier()) { + re_tracing::profile_scope!("primary", &data_result.entity_path.to_string()); + // We want everything, for all times: let timeline_query = re_data_store::RangeQuery::new(query.timeline, TimeRange::EVERYTHING); @@ -77,8 +79,7 @@ impl VisualizerSystem for TextLogSystem { self.entries.push(Entry { row_id, entity_path: data_result.entity_path.clone(), - // TODO(cmc): real support for timeless data in caches. - time: (time != TimeInt::MIN).then(|| time.as_i64()), + time: time.map(|time| time.as_i64()), color: *color, body: body.clone(), level: level.clone(), @@ -88,11 +89,6 @@ impl VisualizerSystem for TextLogSystem { )?; } - { - re_tracing::profile_scope!("sort"); - self.entries.sort_by_key(|entry| entry.time); - } - Ok(Vec::new()) } diff --git a/crates/re_space_view_time_series/src/visualizer_system.rs b/crates/re_space_view_time_series/src/visualizer_system.rs index 52ccb72aa353..16a833d31019 100644 --- a/crates/re_space_view_time_series/src/visualizer_system.rs +++ b/crates/re_space_view_time_series/src/visualizer_system.rs @@ -124,82 +124,92 @@ impl TimeSeriesSystem { for data_result in query.iter_visible_data_results(Self::identifier()) { let mut points = Vec::new(); - let annotations = self.annotation_map.find(&data_result.entity_path); - let annotation_info = annotations - .resolved_class_description(None) - .annotation_info(); - let default_color = DefaultColor::EntityPath(&data_result.entity_path); - - let visible_history = match query.timeline.typ() { - re_log_types::TimeType::Time => { - data_result.accumulated_properties().visible_history.nanos - } - re_log_types::TimeType::Sequence => { - data_result - .accumulated_properties() - .visible_history - .sequences - } - }; - let (from, to) = if data_result.accumulated_properties().visible_history.enabled { - ( - visible_history.from(query.latest_at), - visible_history.to(query.latest_at), - ) - } else { - (i64::MIN.into(), i64::MAX.into()) - }; + { + re_tracing::profile_scope!("primary", &data_result.entity_path.to_string()); + + let annotations = self.annotation_map.find(&data_result.entity_path); + let annotation_info = annotations + .resolved_class_description(None) + .annotation_info(); + let default_color = DefaultColor::EntityPath(&data_result.entity_path); - let query = re_data_store::RangeQuery::new(query.timeline, TimeRange::new(from, to)); - - re_query_cache::query_archetype_pov1_comp4::< - TimeSeriesScalar, - Scalar, - ScalarScattering, - Color, - Radius, - Text, - _, - >( - ctx.app_options.experimental_primary_caching_series, - store, - &query.clone().into(), - &data_result.entity_path, - |((time, _row_id), _, scalars, scatterings, colors, radii, labels)| { - re_tracing::profile_scope!("primary"); - - for (scalar, scattered, color, radius, label) in itertools::izip!( - scalars.iter(), - scatterings.iter(), - colors.iter(), - radii.iter(), - labels.iter() - ) { - let color = - annotation_info.color(color.map(|c| c.to_array()), default_color); - let label = annotation_info.label(label.as_ref().map(|l| l.as_str())); - - const DEFAULT_RADIUS: f32 = 0.75; - - points.push(PlotPoint { - time: time.as_i64(), - value: scalar.0, - attrs: PlotPointAttrs { - label, - color, - radius: radius.map_or(DEFAULT_RADIUS, |r| r.0), - scattered: scattered.map_or(false, |s| s.0), - }, - }); + let visible_history = match query.timeline.typ() { + re_log_types::TimeType::Time => { + data_result.accumulated_properties().visible_history.nanos } - }, - )?; + re_log_types::TimeType::Sequence => { + data_result + .accumulated_properties() + .visible_history + .sequences + } + }; + + let (from, to) = if data_result.accumulated_properties().visible_history.enabled { + ( + visible_history.from(query.latest_at), + visible_history.to(query.latest_at), + ) + } else { + (i64::MIN.into(), i64::MAX.into()) + }; + + let query = + re_data_store::RangeQuery::new(query.timeline, TimeRange::new(from, to)); + + re_query_cache::query_archetype_pov1_comp4::< + TimeSeriesScalar, + Scalar, + ScalarScattering, + Color, + Radius, + Text, + _, + >( + ctx.app_options.experimental_primary_caching_series, + store, + &query.clone().into(), + &data_result.entity_path, + |((time, _row_id), _, scalars, scatterings, colors, radii, labels)| { + let Some(time) = time else { + return; + }; // scalars cannot be timeless + + for (scalar, scattered, color, radius, label) in itertools::izip!( + scalars.iter(), + scatterings.iter(), + colors.iter(), + radii.iter(), + labels.iter() + ) { + let color = + annotation_info.color(color.map(|c| c.to_array()), default_color); + let label = annotation_info.label(label.as_ref().map(|l| l.as_str())); + + const DEFAULT_RADIUS: f32 = 0.75; + + points.push(PlotPoint { + time: time.as_i64(), + value: scalar.0, + attrs: PlotPointAttrs { + label, + color, + radius: radius.map_or(DEFAULT_RADIUS, |r| r.0), + scattered: scattered.map_or(false, |s| s.0), + }, + }); + } + }, + )?; + } if points.is_empty() { continue; } + re_tracing::profile_scope!("secondary", &data_result.entity_path.to_string()); + let min_time = store .entity_min_time(&query.timeline, &data_result.entity_path) .map_or(points.first().map_or(0, |p| p.time), |time| time.as_i64());