Skip to content

Commit

Permalink
rr.log_file_from_path now defaults to the active app/recording ID
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Oct 22, 2024
1 parent fe6e3b6 commit a86ff18
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 12 deletions.
70 changes: 63 additions & 7 deletions crates/store/re_data_loader/src/loader_rrd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use re_log_encoding::decoder::Decoder;

#[cfg(not(target_arch = "wasm32"))]
use crossbeam::channel::Receiver;
use re_log_types::{ApplicationId, StoreId};

// ---

Expand All @@ -17,8 +18,7 @@ impl crate::DataLoader for RrdLoader {
#[cfg(not(target_arch = "wasm32"))]
fn load_from_path(
&self,
// NOTE: The Store ID comes from the rrd file itself.
_settings: &crate::DataLoaderSettings,
settings: &crate::DataLoaderSettings,
filepath: std::path::PathBuf,
tx: std::sync::mpsc::Sender<crate::LoadedData>,
) -> Result<(), crate::DataLoaderError> {
Expand Down Expand Up @@ -57,12 +57,21 @@ impl crate::DataLoader for RrdLoader {
.name(format!("decode_and_stream({filepath:?})"))
.spawn({
let filepath = filepath.clone();
let settings = settings.clone();
move || {
decode_and_stream(&filepath, &tx, decoder);
decode_and_stream(
&filepath,
&tx,
decoder,
settings.opened_application_id.as_ref(),
// We never want to patch blueprints' store IDs, only their app IDs.
None,
);
}
})
.with_context(|| format!("Failed to open spawn IO thread for {filepath:?}"))?;
}

"rrd" => {
// For .rrd files we retry reading despite reaching EOF to support live (writer) streaming.
// Decoder will give up when it sees end of file marker (i.e. end-of-stream message header)
Expand All @@ -76,8 +85,15 @@ impl crate::DataLoader for RrdLoader {
.name(format!("decode_and_stream({filepath:?})"))
.spawn({
let filepath = filepath.clone();
let settings = settings.clone();
move || {
decode_and_stream(&filepath, &tx, decoder);
decode_and_stream(
&filepath,
&tx,
decoder,
settings.opened_application_id.as_ref(),
settings.opened_store_id.as_ref(),
);
}
})
.with_context(|| format!("Failed to open spawn IO thread for {filepath:?}"))?;
Expand All @@ -90,8 +106,7 @@ impl crate::DataLoader for RrdLoader {

fn load_from_file_contents(
&self,
// NOTE: The Store ID comes from the rrd file itself.
_settings: &crate::DataLoaderSettings,
settings: &crate::DataLoaderSettings,
filepath: std::path::PathBuf,
contents: std::borrow::Cow<'_, [u8]>,
tx: std::sync::mpsc::Sender<crate::LoadedData>,
Expand All @@ -116,7 +131,13 @@ impl crate::DataLoader for RrdLoader {
},
};

decode_and_stream(&filepath, &tx, decoder);
decode_and_stream(
&filepath,
&tx,
decoder,
settings.opened_application_id.as_ref(),
settings.opened_store_id.as_ref(),
);

Ok(())
}
Expand All @@ -126,6 +147,8 @@ fn decode_and_stream<R: std::io::Read>(
filepath: &std::path::Path,
tx: &std::sync::mpsc::Sender<crate::LoadedData>,
decoder: Decoder<R>,
forced_application_id: Option<&ApplicationId>,
forced_store_id: Option<&StoreId>,
) {
re_tracing::profile_function!(filepath.display().to_string());

Expand All @@ -137,6 +160,39 @@ fn decode_and_stream<R: std::io::Read>(
continue;
}
};

let msg = if forced_application_id.is_some() || forced_store_id.is_some() {
match msg {
re_log_types::LogMsg::SetStoreInfo(set_store_info) => {
re_log_types::LogMsg::SetStoreInfo(re_log_types::SetStoreInfo {
info: re_log_types::StoreInfo {
application_id: forced_application_id
.cloned()
.unwrap_or(set_store_info.info.application_id),
store_id: forced_store_id
.cloned()
.unwrap_or(set_store_info.info.store_id),
..set_store_info.info
},
..set_store_info
})
}

re_log_types::LogMsg::ArrowMsg(store_id, arrow_msg) => {
re_log_types::LogMsg::ArrowMsg(
forced_store_id.cloned().unwrap_or(store_id),
arrow_msg,
)
}

re_log_types::LogMsg::BlueprintActivationCommand(blueprint_activation_command) => {
re_log_types::LogMsg::BlueprintActivationCommand(blueprint_activation_command)
}
}
} else {
msg
};

if tx.send(msg.into()).is_err() {
break; // The other end has decided to hang up, not our problem.
}
Expand Down
19 changes: 14 additions & 5 deletions crates/top/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1119,7 +1119,7 @@ impl RecordingStream {
entity_path_prefix: Option<EntityPath>,
static_: bool,
) -> RecordingStreamResult<()> {
self.log_file(filepath, None, entity_path_prefix, static_)
self.log_file(filepath, None, entity_path_prefix, static_, true)
}

/// Logs the given `contents` using all [`re_data_loader::DataLoader`]s available.
Expand All @@ -1138,16 +1138,20 @@ impl RecordingStream {
entity_path_prefix: Option<EntityPath>,
static_: bool,
) -> RecordingStreamResult<()> {
self.log_file(filepath, Some(contents), entity_path_prefix, static_)
self.log_file(filepath, Some(contents), entity_path_prefix, static_, true)
}

/// If `prefer_current_recording` is set (which is always the case for now), the dataloader settings
/// will be configured as if the current SDK recording is the currently opened recording.
/// Most dataloaders prefer logging to the currently opened recording if one is set.
#[cfg(feature = "data_loaders")]
fn log_file(
&self,
filepath: impl AsRef<std::path::Path>,
contents: Option<std::borrow::Cow<'_, [u8]>>,
entity_path_prefix: Option<EntityPath>,
static_: bool,
prefer_current_recording: bool,
) -> RecordingStreamResult<()> {
let Some(store_info) = self.store_info().clone() else {
re_log::warn!("Ignored call to log_file() because RecordingStream has not been properly initialized");
Expand All @@ -1162,10 +1166,10 @@ impl RecordingStream {
re_smart_channel::SmartChannelSource::File(filepath.into()),
);

let settings = crate::DataLoaderSettings {
let mut settings = crate::DataLoaderSettings {
application_id: Some(store_info.application_id.clone()),
opened_application_id: None,
store_id: store_info.store_id,
store_id: store_info.store_id.clone(),
opened_store_id: None,
entity_path_prefix,
timepoint: (!static_).then(|| {
Expand All @@ -1183,9 +1187,14 @@ impl RecordingStream {
now
})
.unwrap_or_default()
}), // timepoint: self.time,
}),
};

if prefer_current_recording {
settings.opened_application_id = Some(store_info.application_id.clone());
settings.opened_store_id = Some(store_info.store_id);
}

if let Some(contents) = contents {
re_data_loader::load_from_file_contents(
&settings,
Expand Down

0 comments on commit a86ff18

Please sign in to comment.