diff --git a/Cargo.lock b/Cargo.lock index fc3939a4eb8e3..89ff3e18e9070 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4627,6 +4627,7 @@ dependencies = [ "rand", "re_build_info", "re_build_tools", + "re_data_source", "re_data_store", "re_log", "re_log_encoding", diff --git a/crates/re_data_source/src/load_file.rs b/crates/re_data_source/src/load_file.rs index 8a5e06ab88c5e..b9cf33cb877d4 100644 --- a/crates/re_data_source/src/load_file.rs +++ b/crates/re_data_source/src/load_file.rs @@ -27,7 +27,7 @@ pub fn load_from_path( if !path.exists() { return Err(std::io::Error::new( std::io::ErrorKind::NotFound, - "path does not exist: {path:?}", + format!("path does not exist: {path:?}"), ) .into()); } diff --git a/crates/re_log_types/src/lib.rs b/crates/re_log_types/src/lib.rs index a0788451545e1..0bc8f6a350b5b 100644 --- a/crates/re_log_types/src/lib.rs +++ b/crates/re_log_types/src/lib.rs @@ -312,6 +312,7 @@ pub enum FileSource { Cli, DragAndDrop, FileDialog, + Sdk, } /// The source of a recording or blueprint. @@ -358,6 +359,7 @@ impl std::fmt::Display for StoreSource { FileSource::Cli => write!(f, "File via CLI"), FileSource::DragAndDrop => write!(f, "File via drag-and-drop"), FileSource::FileDialog => write!(f, "File via file dialog"), + FileSource::Sdk => write!(f, "File via SDK"), }, Self::Viewer => write!(f, "Viewer-generated"), Self::Other(string) => format!("{string:?}").fmt(f), // put it in quotes diff --git a/crates/re_sdk/Cargo.toml b/crates/re_sdk/Cargo.toml index c6073a4f83e15..833d3eb3f81df 100644 --- a/crates/re_sdk/Cargo.toml +++ b/crates/re_sdk/Cargo.toml @@ -31,7 +31,6 @@ default = [] ## However, when building from source in the repository, this feature adds quite a bit ## to the compile time since it requires compiling and bundling the viewer as wasm. web_viewer = [ - "dep:re_smart_channel", "dep:re_web_viewer_server", "dep:re_ws_comms", "dep:anyhow", @@ -42,11 +41,13 @@ web_viewer = [ [dependencies] re_build_info.workspace = true +re_data_source.workspace = true re_log_encoding = { workspace = true, features = ["encoder"] } re_log_types.workspace = true re_log.workspace = true re_memory.workspace = true re_sdk_comms = { workspace = true, features = ["client"] } +re_smart_channel.workspace = true re_types_core.workspace = true ahash.workspace = true @@ -58,7 +59,6 @@ thiserror.workspace = true # Optional dependencies -re_smart_channel = { workspace = true, optional = true } re_ws_comms = { workspace = true, optional = true } re_web_viewer_server = { workspace = true, optional = true } diff --git a/crates/re_sdk/src/recording_stream.rs b/crates/re_sdk/src/recording_stream.rs index c1255be696f60..a4cc0ff22d8dd 100644 --- a/crates/re_sdk/src/recording_stream.rs +++ b/crates/re_sdk/src/recording_stream.rs @@ -5,6 +5,7 @@ use std::sync::{atomic::AtomicI64, Arc}; use ahash::HashMap; use crossbeam::channel::{Receiver, Sender}; +use parking_lot::Mutex; use re_log_types::{ ApplicationId, ArrowChunkReleaseCallback, DataCell, DataCellError, DataRow, DataTable, DataTableBatcher, DataTableBatcherConfig, DataTableBatcherError, EntityPath, LogMsg, RowId, @@ -61,7 +62,7 @@ pub enum RecordingStreamError { #[error("Failed to spawn background thread '{name}': {err}")] SpawnThread { /// Name of the thread - name: &'static str, + name: String, /// Inner error explaining why the thread failed to spawn. err: std::io::Error, @@ -79,6 +80,10 @@ pub enum RecordingStreamError { /// An error that can occur because a row in the store has inconsistent columns. #[error(transparent)] DataReadError(#[from] re_log_types::DataReadError), + + /// An error occurred when attempting to used a [`re_data_source::DataLoader`]. + #[error(transparent)] + DataLoaderError(#[from] re_data_source::DataLoaderError), } /// Results that can occur when creating/manipulating a [`RecordingStream`]. @@ -623,6 +628,12 @@ struct RecordingStreamInner { batcher: DataTableBatcher, batcher_to_sink_handle: Option>, + /// Keeps track of the top-level threads that were spawned in order to execute the `DataLoader` + /// machinery in the context of this `RecordingStream`. + /// + /// See [`RecordingStream::log_file_from_path`] and [`RecordingStream::log_file_from_contents`]. + dataloader_handles: Mutex>>, + pid_at_creation: u32, } @@ -633,6 +644,16 @@ impl Drop for RecordingStreamInner { return; } + // Run all pending top-level `DataLoader` threads that were started from the SDK to completion. + // + // TODO(cmc): At some point we might want to make it configurable, though I cannot really + // think of a use case where you'd want to drop those threads immediately upon + // disconnection. + let dataloader_handles = std::mem::take(&mut *self.dataloader_handles.lock()); + for handle in dataloader_handles { + handle.join().ok(); + } + // NOTE: The command channel is private, if we're here, nothing is currently capable of // sending data down the pipeline. self.batcher.flush_blocking(); @@ -679,7 +700,10 @@ impl RecordingStreamInner { let batcher = batcher.clone(); move || forwarding_thread(info, sink, cmds_rx, batcher.tables(), on_release) }) - .map_err(|err| RecordingStreamError::SpawnThread { name: NAME, err })? + .map_err(|err| RecordingStreamError::SpawnThread { + name: NAME.into(), + err, + })? }; Ok(RecordingStreamInner { @@ -688,6 +712,7 @@ impl RecordingStreamInner { cmds_tx, batcher, batcher_to_sink_handle: Some(batcher_to_sink_handle), + dataloader_handles: Mutex::new(Vec::new()), pid_at_creation: std::process::id(), }) } @@ -989,6 +1014,100 @@ impl RecordingStream { Ok(()) } + + /// Logs the file at the given `path` using all [`re_data_source::DataLoader`]s available. + /// + /// A single `path` might be handled by more than one loader. + /// + /// This method blocks until either at least one [`re_data_source::DataLoader`] starts + /// streaming data in or all of them fail. + /// + /// See for more information. + pub fn log_file_from_path( + &self, + filepath: impl AsRef, + ) -> RecordingStreamResult<()> { + self.log_file(filepath, None) + } + + /// Logs the given `contents` using all [`re_data_source::DataLoader`]s available. + /// + /// A single `path` might be handled by more than one loader. + /// + /// This method blocks until either at least one [`re_data_source::DataLoader`] starts + /// streaming data in or all of them fail. + /// + /// See for more information. + pub fn log_file_from_contents( + &self, + filepath: impl AsRef, + contents: std::borrow::Cow<'_, [u8]>, + ) -> RecordingStreamResult<()> { + self.log_file(filepath, Some(contents)) + } + + fn log_file( + &self, + filepath: impl AsRef, + contents: Option>, + ) -> RecordingStreamResult<()> { + let filepath = filepath.as_ref(); + let has_contents = contents.is_some(); + + let (tx, rx) = re_smart_channel::smart_channel( + re_smart_channel::SmartMessageSource::Sdk, + re_smart_channel::SmartChannelSource::File(filepath.into()), + ); + + let Some(store_id) = &self.store_info().map(|info| info.store_id.clone()) else { + // There's no recording. + return Ok(()); + }; + if let Some(contents) = contents { + re_data_source::load_from_file_contents( + store_id, + re_log_types::FileSource::Sdk, + filepath, + contents, + &tx, + )?; + } else { + re_data_source::load_from_path(store_id, re_log_types::FileSource::Sdk, filepath, &tx)?; + } + drop(tx); + + // We can safely ignore the error on `recv()` as we're in complete control of both ends of + // the channel. + let thread_name = if has_contents { + format!("log_file_from_contents({filepath:?})") + } else { + format!("log_file_from_path({filepath:?})") + }; + let handle = std::thread::Builder::new() + .name(thread_name.clone()) + .spawn({ + let this = self.clone(); + move || { + while let Some(msg) = rx.recv().ok().and_then(|msg| msg.into_data()) { + this.record_msg(msg); + } + } + }) + .map_err(|err| RecordingStreamError::SpawnThread { + name: thread_name, + err, + })?; + + debug_assert!( + self.inner.is_some(), + "recording should always be fully init at this stage" + ); + if let Some(inner) = self.inner.as_ref() { + inner.dataloader_handles.lock().push(handle); + } + + Ok(()) + } } #[allow(clippy::needless_pass_by_value)] @@ -1450,6 +1569,22 @@ impl RecordingStream { /// terms of data durability and ordering. /// See [`Self::set_sink`] for more information. pub fn disconnect(&self) { + let Some(this) = &*self.inner else { + re_log::warn_once!("Recording disabled - call to disconnect() ignored"); + return; + }; + + // When disconnecting, we need to make sure that pending top-level `DataLoader` threads that + // were started from the SDK run to completion. + // + // TODO(cmc): At some point we might want to make it configurable, though I cannot really + // think of a use case where you'd want to drop those threads immediately upon + // disconnection. + let dataloader_handles = std::mem::take(&mut *this.dataloader_handles.lock()); + for handle in dataloader_handles { + handle.join().ok(); + } + self.set_sink(Box::new(crate::sink::BufferedSink::new())); } } @@ -1465,11 +1600,13 @@ impl fmt::Debug for RecordingStream { cmds_tx: _, batcher: _, batcher_to_sink_handle: _, + dataloader_handles, pid_at_creation, }) => f .debug_struct("RecordingStream") .field("info", &info) .field("tick", &tick) + .field("pending_dataloaders", &dataloader_handles.lock().len()) .field("pid_at_creation", &pid_at_creation) .finish_non_exhaustive(), None => write!(f, "RecordingStream {{ disabled }}"), diff --git a/crates/re_viewer/src/viewer_analytics/event.rs b/crates/re_viewer/src/viewer_analytics/event.rs index 239882120cf4f..de27abf4c7101 100644 --- a/crates/re_viewer/src/viewer_analytics/event.rs +++ b/crates/re_viewer/src/viewer_analytics/event.rs @@ -66,6 +66,7 @@ pub fn open_recording( re_log_types::FileSource::Cli => "file_cli".to_owned(), re_log_types::FileSource::DragAndDrop => "file_drag_and_drop".to_owned(), re_log_types::FileSource::FileDialog => "file_dialog".to_owned(), + re_log_types::FileSource::Sdk => "file_sdk".to_owned(), }, S::Viewer => "viewer".to_owned(), S::Other(other) => other.clone(),