Skip to content

Commit

Permalink
Primary caching 16: context-free range semantics (#4851)
Browse files Browse the repository at this point in the history
Range queries used to A) return the frame a T-1, B) accumulate state
starting at T-1 and then C) yield frames starting at T.

A) was a huge issue for many reasons, which #4793 took care of by
eliminating both A) and B).

But we need B) for range queries to be context-free, i.e. to be
guaranteed that `Range(5, 10)` and `Range(4, 10)` will return the exact
same data for frame `5`.
This is crucial for multi-tenant settings where those 2 example queries
would share the same cache.

It also is the nicer-nicer version of the range semantics that we wanted
anyway, I just didn't realize back then that it would require so little
changes, or I would've gone straight for that.

---

Part of the primary caching series of PR (index search, joins,
deserialization):
- #4592
- #4593
- #4659
- #4680 
- #4681
- #4698
- #4711
- #4712
- #4721 
- #4726 
- #4773
- #4784
- #4785
- #4793
- #4800
- #4851
- #4852
- #4853
- #4856
  • Loading branch information
teh-cmc authored Jan 23, 2024
1 parent bd64926 commit 20c768f
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 56 deletions.
51 changes: 42 additions & 9 deletions crates/re_data_store/src/polars_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,21 +123,54 @@ pub fn range_components<'a, const N: usize>(

let mut state = None;

// NOTE: This will return none for `TimeInt::Min`, i.e. range queries that start infinitely far
// into the past don't have a latest-at state!
let latest_time = query.range.min.as_i64().checked_sub(1).map(Into::into);

let mut df_latest = None;
if let Some(latest_time) = latest_time {
let df = latest_components(
store,
&LatestAtQuery::new(query.timeline, latest_time),
ent_path,
&components,
join_type,
);

df_latest = Some(df);
}

let primary_col = components
.iter()
.find_position(|component| **component == primary)
.map(|(col, _)| col)
.unwrap(); // asserted on entry

store
.range(query, ent_path, components)
.map(move |(time, _, cells)| {
(
time,
cells[primary_col].is_some(), // is_primary
dataframe_from_cells(&cells),
)
})
// send the latest-at state before anything else
df_latest
.into_iter()
// NOTE: `false` here means we will _not_ yield the latest-at state as an actual
// ArchetypeView!
// That is a very important detail: for overlapping range queries to be correct in a
// multi-tenant cache context, we need to make sure to inherit the latest-at state
// from T-1, while also making sure to _not_ yield the view that comes with that state.
//
// Consider e.g. what happens when one system queries for `range(10, 20)` while another
// queries for `range(9, 20)`: the data at timestamp `10` would differ because of the
// statefulness of range queries!
.map(move |df| (latest_time, false, df))
// followed by the range
.chain(
store
.range(query, ent_path, components)
.map(move |(time, _, cells)| {
(
time,
cells[primary_col].is_some(), // is_primary
dataframe_from_cells(&cells),
)
}),
)
.filter_map(move |(time, is_primary, df)| {
state = Some(join_dataframes(
cluster_key,
Expand Down
39 changes: 22 additions & 17 deletions crates/re_data_store/tests/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,18 +553,18 @@ fn range_impl(store: &mut DataStore) {

// Unit ranges (Color's PoV)

// NOTE: Check out [1] to see what the results would've looked like with latest-at semantics at
// T-1 baked in (like we used to do).
//
// [1]: <https://github.com/rerun-io/rerun/blob/790f391/crates/re_data_store/tests/data_store.rs#L555-L837>

assert_range_components(
TimeRange::new(frame1, frame1),
[Color::name(), Position2D::name()],
&[(
Some(frame1),
&[(Color::name(), &row1)], //
)],
&[
(
Some(frame1),
&[
(Color::name(), &row1),
(Position2D::name(), &row4_4), // timeless
],
), //
],
);
assert_range_components(
TimeRange::new(frame2, frame2),
Expand All @@ -582,11 +582,11 @@ fn range_impl(store: &mut DataStore) {
&[
(
Some(frame4),
&[(Color::name(), &row4_1)], //
&[(Color::name(), &row4_1), (Position2D::name(), &row3)],
),
(
Some(frame4),
&[(Color::name(), &row4_2)], //
&[(Color::name(), &row4_2), (Position2D::name(), &row3)],
),
(
Some(frame4),
Expand All @@ -613,17 +613,19 @@ fn range_impl(store: &mut DataStore) {
&[
(
Some(frame2),
&[(Position2D::name(), &row2)], //
&[(Position2D::name(), &row2), (Color::name(), &row1)],
), //
],
);
assert_range_components(
TimeRange::new(frame3, frame3),
[Position2D::name(), Color::name()],
&[(
Some(frame3),
&[(Position2D::name(), &row3)], //
)],
&[
(
Some(frame3),
&[(Position2D::name(), &row3), (Color::name(), &row1)],
), //
],
);
assert_range_components(
TimeRange::new(frame4, frame4),
Expand Down Expand Up @@ -653,7 +655,10 @@ fn range_impl(store: &mut DataStore) {
&[
(
Some(frame1),
&[(Color::name(), &row1)], //
&[
(Color::name(), &row1),
(Position2D::name(), &row4_4), // timeless
],
),
(
Some(frame4),
Expand Down
87 changes: 63 additions & 24 deletions crates/re_query/src/range.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use itertools::Itertools as _;
use re_data_store::{DataStore, RangeQuery};
use re_data_store::{DataStore, LatestAtQuery, RangeQuery};
use re_log_types::EntityPath;
use re_types_core::{Archetype, ComponentName};

use crate::{ArchetypeView, ComponentWithInstances};
use crate::{get_component_with_instances, ArchetypeView, ComponentWithInstances};

// ---

Expand Down Expand Up @@ -61,29 +61,68 @@ pub fn range_archetype<'a, A: Archetype + 'a, const N: usize>(
.take(components.len())
.collect();

store
.range(query, ent_path, components)
.map(move |(data_time, row_id, mut cells)| {
// NOTE: The unwrap cannot fail, the cluster key's presence is guaranteed
// by the store.
let instance_keys = cells[cluster_col].take().unwrap();
let is_primary = cells[primary_col].is_some();
let cwis = cells
.into_iter()
.map(|cell| {
cell.map(|cell| {
(
row_id,
ComponentWithInstances {
instance_keys: instance_keys.clone(), /* shallow */
values: cell,
},
)
// NOTE: This will return none for `TimeInt::Min`, i.e. range queries that start infinitely far
// into the past don't have a latest-at state!
let query_time = query.range.min.as_i64().checked_sub(1).map(Into::into);

let mut cwis_latest = None;
if let Some(query_time) = query_time {
let mut cwis_latest_raw: Vec<_> = std::iter::repeat_with(|| None)
.take(components.len())
.collect();

// Fetch the latest data for every single component from their respective point-of-views,
// this will allow us to build up the initial state and send an initial latest-at
// entity-view if needed.
for (i, primary) in components.iter().enumerate() {
cwis_latest_raw[i] = get_component_with_instances(
store,
&LatestAtQuery::new(query.timeline, query_time),
ent_path,
*primary,
)
.map(|(_, row_id, cwi)| (row_id, cwi));
}

cwis_latest = Some(cwis_latest_raw);
}

// send the latest-at state before anything else
cwis_latest
.into_iter()
// NOTE: `false` here means we will _not_ yield the latest-at state as an actual
// ArchetypeView!
// That is a very important detail: for overlapping range queries to be correct in a
// multi-tenant cache context, we need to make sure to inherit the latest-at state
// from T-1, while also making sure to _not_ yield the view that comes with that state.
//
// Consider e.g. what happens when one system queries for `range(10, 20)` while another
// queries for `range(9, 20)`: the data at timestamp `10` would differ because of the
// statefulness of range queries!
.map(move |cwis| (query_time, false, cwis))
.chain(store.range(query, ent_path, components).map(
move |(data_time, row_id, mut cells)| {
// NOTE: The unwrap cannot fail, the cluster key's presence is guaranteed
// by the store.
let instance_keys = cells[cluster_col].take().unwrap();
let is_primary = cells[primary_col].is_some();
let cwis = cells
.into_iter()
.map(|cell| {
cell.map(|cell| {
(
row_id,
ComponentWithInstances {
instance_keys: instance_keys.clone(), /* shallow */
values: cell,
},
)
})
})
})
.collect::<Vec<_>>();
(data_time, is_primary, cwis)
})
.collect::<Vec<_>>();
(data_time, is_primary, cwis)
},
))
.filter_map(move |(data_time, is_primary, cwis)| {
for (i, cwi) in cwis
.into_iter()
Expand Down
6 changes: 1 addition & 5 deletions crates/re_query/tests/archetype_range_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,6 @@ fn timeless_range() {

// --- First test: `(timepoint1, timepoint3]` ---

// The exclusion of `timepoint1` means latest-at semantics will kick in!

let query = re_data_store::RangeQuery::new(
timepoint1[0].0,
TimeRange::new((timepoint1[0].1.as_i64() + 1).into(), timepoint3[0].1),
Expand Down Expand Up @@ -416,7 +414,7 @@ fn timeless_range() {
Some(Position2D::new(1.0, 2.0)),
Some(Position2D::new(3.0, 4.0)),
];
let colors: Vec<Option<Color>> = vec![None, None];
let colors = vec![None, Some(Color::from_rgb(255, 0, 0))];
let expected = DataCellRow(smallvec![
DataCell::from_native_sparse(instances),
DataCell::from_native_sparse(positions),
Expand Down Expand Up @@ -731,8 +729,6 @@ fn simple_splatted_range() {

// --- Second test: `[timepoint1, timepoint3]` ---

// The inclusion of `timepoint1` means latest-at semantics will _not_ kick in!

let query = re_data_store::RangeQuery::new(
timepoint1[0].0,
TimeRange::new(timepoint1[0].1, timepoint3[0].1),
Expand Down
2 changes: 1 addition & 1 deletion tests/rust/plot_dashboard_stress/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ publish = false

[dependencies]
re_log = { workspace = true, features = ["setup"] }
rerun = { path = "../../../crates/rerun" }
rerun = { path = "../../../crates/rerun", features = ["clap"] }

anyhow = "1.0"
clap = { version = "4.0", features = ["derive"] }
Expand Down

0 comments on commit 20c768f

Please sign in to comment.