Skip to content

Commit

Permalink
Consistent open/import/log_file behaviors in all common scenarios (#7966
Browse files Browse the repository at this point in the history
)

All the major data-loading paths now behave sanely and consistently on
both native and web.

You can try and look at the matrix below for the specific behavior of
each path, but really my advice would be to just play with it to get a
sense of things.

One of these days we'll clearly need a bottom-up in-hindsight
refactoring for all these things, it is _so_ complex.
But today is not that day.

* Fixes #7965 

---------

Co-authored-by: Emil Ernerfeldt <[email protected]>
  • Loading branch information
teh-cmc and emilk authored Nov 6, 2024
1 parent 44501bd commit 9a5b36a
Show file tree
Hide file tree
Showing 13 changed files with 170 additions and 50 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5523,6 +5523,7 @@ dependencies = [
"re_types",
"tempfile",
"thiserror",
"uuid",
"walkdir",
]

Expand Down Expand Up @@ -6562,6 +6563,7 @@ dependencies = [
"strum",
"strum_macros",
"thiserror",
"uuid",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
Expand Down
1 change: 1 addition & 0 deletions crates/store/re_data_loader/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ once_cell.workspace = true
parking_lot.workspace = true
rayon.workspace = true
thiserror.workspace = true
uuid.workspace = true
walkdir.workspace = true

[target.'cfg(not(any(target_arch = "wasm32")))'.dependencies]
Expand Down
37 changes: 29 additions & 8 deletions crates/store/re_data_loader/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ pub struct DataLoaderSettings {
/// The [`re_log_types::StoreId`] that is currently opened in the viewer, if any.
pub opened_store_id: Option<re_log_types::StoreId>,

/// Whether `SetStoreInfo`s should be sent, regardless of the surrounding context.
///
/// Only useful when creating a recording just-in-time directly in the viewer (which is what
/// happens when importing things into the welcome screen).
pub force_store_info: bool,

/// What should the logged entity paths be prefixed with?
pub entity_path_prefix: Option<EntityPath>,

Expand All @@ -79,6 +85,7 @@ impl DataLoaderSettings {
opened_application_id: Default::default(),
store_id: store_id.into(),
opened_store_id: Default::default(),
force_store_info: false,
entity_path_prefix: Default::default(),
timepoint: Default::default(),
}
Expand All @@ -91,6 +98,7 @@ impl DataLoaderSettings {
opened_application_id,
store_id,
opened_store_id,
force_store_info: _,
entity_path_prefix,
timepoint,
} = self;
Expand Down Expand Up @@ -150,6 +158,8 @@ impl DataLoaderSettings {
}
}

pub type DataLoaderName = String;

/// A [`DataLoader`] loads data from a file path and/or a file's contents.
///
/// Files can be loaded in 3 different ways:
Expand Down Expand Up @@ -205,8 +215,8 @@ impl DataLoaderSettings {
pub trait DataLoader: Send + Sync {
/// Name of the [`DataLoader`].
///
/// Doesn't need to be unique.
fn name(&self) -> String;
/// Should be globally unique.
fn name(&self) -> DataLoaderName;

/// Loads data from a file on the local filesystem and sends it to `tx`.
///
Expand Down Expand Up @@ -314,20 +324,31 @@ impl DataLoaderError {
/// most convenient for them, whether it is raw components, arrow chunks or even
/// full-on [`LogMsg`]s.
pub enum LoadedData {
Chunk(re_log_types::StoreId, Chunk),
ArrowMsg(re_log_types::StoreId, ArrowMsg),
LogMsg(LogMsg),
Chunk(DataLoaderName, re_log_types::StoreId, Chunk),
ArrowMsg(DataLoaderName, re_log_types::StoreId, ArrowMsg),
LogMsg(DataLoaderName, LogMsg),
}

impl LoadedData {
/// Returns the name of the [`DataLoader`] that generated this data.
#[inline]
pub fn data_loader_name(&self) -> &DataLoaderName {
match self {
Self::Chunk(name, ..) | Self::ArrowMsg(name, ..) | Self::LogMsg(name, ..) => name,
}
}

/// Pack the data into a [`LogMsg`].
#[inline]
pub fn into_log_msg(self) -> ChunkResult<LogMsg> {
match self {
Self::Chunk(store_id, chunk) => Ok(LogMsg::ArrowMsg(store_id, chunk.to_arrow_msg()?)),
Self::Chunk(_name, store_id, chunk) => {
Ok(LogMsg::ArrowMsg(store_id, chunk.to_arrow_msg()?))
}

Self::ArrowMsg(store_id, msg) => Ok(LogMsg::ArrowMsg(store_id, msg)),
Self::ArrowMsg(_name, store_id, msg) => Ok(LogMsg::ArrowMsg(store_id, msg)),

Self::LogMsg(msg) => Ok(msg),
Self::LogMsg(_name, msg) => Ok(msg),
}
}
}
Expand Down
49 changes: 33 additions & 16 deletions crates/store/re_data_loader/src/load_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use ahash::{HashMap, HashMapExt};
use re_log_types::{FileSource, LogMsg};
use re_smart_channel::Sender;

use crate::{DataLoaderError, LoadedData};
use crate::{DataLoader, DataLoaderError, LoadedData, RrdLoader};

// ---

Expand Down Expand Up @@ -37,7 +37,7 @@ pub fn load_from_path(

let rx = load(settings, path, None)?;

send(settings.clone(), file_source, path.to_owned(), rx, tx);
send(settings.clone(), file_source, rx, tx);

Ok(())
}
Expand All @@ -64,7 +64,7 @@ pub fn load_from_file_contents(

let data = load(settings, filepath, Some(contents))?;

send(settings.clone(), file_source, filepath.to_owned(), data, tx);
send(settings.clone(), file_source, data, tx);

Ok(())
}
Expand All @@ -73,21 +73,20 @@ pub fn load_from_file_contents(

/// Prepares an adequate [`re_log_types::StoreInfo`] [`LogMsg`] given the input.
pub(crate) fn prepare_store_info(
application_id: re_log_types::ApplicationId,
store_id: &re_log_types::StoreId,
file_source: FileSource,
path: &std::path::Path,
) -> LogMsg {
re_tracing::profile_function!(path.display().to_string());
re_tracing::profile_function!();

use re_log_types::SetStoreInfo;

let app_id = re_log_types::ApplicationId(path.display().to_string());
let store_source = re_log_types::StoreSource::File { file_source };

LogMsg::SetStoreInfo(SetStoreInfo {
row_id: *re_chunk::RowId::new(),
info: re_log_types::StoreInfo {
application_id: app_id.clone(),
application_id,
store_id: store_id.clone(),
cloned_from: None,
is_official_example: false,
Expand Down Expand Up @@ -263,14 +262,19 @@ pub(crate) fn load(
pub(crate) fn send(
settings: crate::DataLoaderSettings,
file_source: FileSource,
path: std::path::PathBuf,
rx_loader: std::sync::mpsc::Receiver<LoadedData>,
tx: &Sender<LogMsg>,
) {
spawn({
re_tracing::profile_function!();

let mut store_info_tracker: HashMap<re_log_types::StoreId, bool> = HashMap::new();
#[derive(Default, Debug)]
struct Tracked {
is_rrd_or_rbl: bool,
already_has_store_info: bool,
}

let mut store_info_tracker: HashMap<re_log_types::StoreId, Tracked> = HashMap::new();

let tx = tx.clone();
move || {
Expand All @@ -280,6 +284,7 @@ pub(crate) fn send(
// poll the channel in any case so as to make sure that the data producer
// doesn't get stuck.
for data in rx_loader {
let data_loader_name = data.data_loader_name().clone();
let msg = match data.into_log_msg() {
Ok(msg) => {
let store_info = match &msg {
Expand All @@ -293,7 +298,10 @@ pub(crate) fn send(
};

if let Some((store_id, store_info_created)) = store_info {
*store_info_tracker.entry(store_id).or_default() |= store_info_created;
let tracked = store_info_tracker.entry(store_id).or_default();
tracked.is_rrd_or_rbl =
*data_loader_name == RrdLoader::name(&RrdLoader);
tracked.already_has_store_info |= store_info_created;
}

msg
Expand All @@ -306,16 +314,25 @@ pub(crate) fn send(
tx.send(msg).ok();
}

for (store_id, store_info_already_created) in store_info_tracker {
for (store_id, tracked) in store_info_tracker {
let is_a_preexisting_recording =
Some(&store_id) == settings.opened_store_id.as_ref();

if store_info_already_created || is_a_preexisting_recording {
continue;
}
// Never try to send custom store info for RRDs and RBLs, they always have their own, and
// it's always right.
let should_force_store_info = settings.force_store_info && !tracked.is_rrd_or_rbl;

let should_send_new_store_info = should_force_store_info
|| (!tracked.already_has_store_info && !is_a_preexisting_recording);

let store_info = prepare_store_info(&store_id, file_source.clone(), &path);
tx.send(store_info).ok();
if should_send_new_store_info {
let app_id = settings
.opened_application_id
.clone()
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string().into());
let store_info = prepare_store_info(app_id, &store_id, file_source.clone());
tx.send(store_info).ok();
}
}

tx.quit(None).ok();
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_data_loader/src/loader_archetype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl DataLoader for ArchetypeLoader {
.clone()
.unwrap_or_else(|| settings.store_id.clone());
for row in rows {
let data = LoadedData::Chunk(store_id.clone(), row);
let data = LoadedData::Chunk(Self::name(&Self), store_id.clone(), row);
if tx.send(data).is_err() {
break; // The other end has decided to hang up, not our problem.
}
Expand Down
4 changes: 2 additions & 2 deletions crates/store/re_data_loader/src/loader_external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
use ahash::HashMap;
use once_cell::sync::Lazy;

use crate::LoadedData;
use crate::{DataLoader, LoadedData};

// ---

Expand Down Expand Up @@ -321,7 +321,7 @@ fn decode_and_stream<R: std::io::Read>(
}
};

let data = LoadedData::LogMsg(msg);
let data = LoadedData::LogMsg(ExternalLoader::name(&ExternalLoader), msg);
if tx.send(data).is_err() {
break; // The other end has decided to hang up, not our problem.
}
Expand Down
17 changes: 13 additions & 4 deletions crates/store/re_data_loader/src/loader_rrd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use re_log_encoding::decoder::Decoder;
use crossbeam::channel::Receiver;
use re_log_types::{ApplicationId, StoreId};

use crate::LoadedData;
use crate::{DataLoader as _, LoadedData};

// ---

Expand Down Expand Up @@ -130,12 +130,21 @@ impl crate::DataLoader for RrdLoader {
},
};

// * We never want to patch blueprints' store IDs, only their app IDs.
// * We neer use import semantics at all for .rrd files.
let forced_application_id = if extension == "rbl" {
settings.opened_application_id.as_ref()
} else {
None
};
let forced_recording_id = None;

decode_and_stream(
&filepath,
&tx,
decoder,
settings.opened_application_id.as_ref(),
settings.opened_store_id.as_ref(),
forced_application_id,
forced_recording_id,
);

Ok(())
Expand Down Expand Up @@ -192,7 +201,7 @@ fn decode_and_stream<R: std::io::Read>(
msg
};

let data = LoadedData::LogMsg(msg);
let data = LoadedData::LogMsg(RrdLoader::name(&RrdLoader), msg);
if tx.send(data).is_err() {
break; // The other end has decided to hang up, not our problem.
}
Expand Down
3 changes: 3 additions & 0 deletions crates/store/re_data_source/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ impl DataSource {
let settings = re_data_loader::DataLoaderSettings {
opened_application_id: file_source.recommended_application_id().cloned(),
opened_store_id: file_source.recommended_recording_id().cloned(),
force_store_info: file_source.force_store_info(),
..re_data_loader::DataLoaderSettings::recommended(shared_store_id)
};
re_data_loader::load_from_path(&settings, file_source, &path, &tx)
Expand Down Expand Up @@ -206,6 +207,7 @@ impl DataSource {
let settings = re_data_loader::DataLoaderSettings {
opened_application_id: file_source.recommended_application_id().cloned(),
opened_store_id: file_source.recommended_recording_id().cloned(),
force_store_info: file_source.force_store_info(),
..re_data_loader::DataLoaderSettings::recommended(shared_store_id)
};
re_data_loader::load_from_file_contents(
Expand Down Expand Up @@ -275,6 +277,7 @@ fn test_data_source_from_uri() {
let file_source = FileSource::DragAndDrop {
recommended_application_id: None,
recommended_recording_id: None,
force_store_info: false,
};

for uri in file {
Expand Down
31 changes: 31 additions & 0 deletions crates/store/re_log_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,21 +413,39 @@ pub enum FileSource {
DragAndDrop {
/// The [`ApplicationId`] that the viewer heuristically recommends should be used when loading
/// this data source, based on the surrounding context.
#[cfg_attr(feature = "serde", serde(skip))]
recommended_application_id: Option<ApplicationId>,

/// The [`StoreId`] that the viewer heuristically recommends should be used when loading
/// this data source, based on the surrounding context.
#[cfg_attr(feature = "serde", serde(skip))]
recommended_recording_id: Option<StoreId>,

/// Whether `SetStoreInfo`s should be sent, regardless of the surrounding context.
///
/// Only useful when creating a recording just-in-time directly in the viewer (which is what
/// happens when importing things into the welcome screen).
#[cfg_attr(feature = "serde", serde(skip))]
force_store_info: bool,
},

FileDialog {
/// The [`ApplicationId`] that the viewer heuristically recommends should be used when loading
/// this data source, based on the surrounding context.
#[cfg_attr(feature = "serde", serde(skip))]
recommended_application_id: Option<ApplicationId>,

/// The [`StoreId`] that the viewer heuristically recommends should be used when loading
/// this data source, based on the surrounding context.
#[cfg_attr(feature = "serde", serde(skip))]
recommended_recording_id: Option<StoreId>,

/// Whether `SetStoreInfo`s should be sent, regardless of the surrounding context.
///
/// Only useful when creating a recording just-in-time directly in the viewer (which is what
/// happens when importing things into the welcome screen).
#[cfg_attr(feature = "serde", serde(skip))]
force_store_info: bool,
},

Sdk,
Expand Down Expand Up @@ -463,6 +481,19 @@ impl FileSource {
Self::Cli | Self::Sdk => None,
}
}

#[inline]
pub fn force_store_info(&self) -> bool {
match self {
Self::FileDialog {
force_store_info, ..
}
| Self::DragAndDrop {
force_store_info, ..
} => *force_store_info,
Self::Cli | Self::Sdk => false,
}
}
}

/// The source of a recording or blueprint.
Expand Down
1 change: 1 addition & 0 deletions crates/top/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1259,6 +1259,7 @@ impl RecordingStream {
opened_application_id: None,
store_id: store_info.store_id.clone(),
opened_store_id: None,
force_store_info: false,
entity_path_prefix,
timepoint: (!static_).then(|| {
self.with(|inner| {
Expand Down
Loading

0 comments on commit 9a5b36a

Please sign in to comment.