diff --git a/Cargo.lock b/Cargo.lock index 60076725fe56..86893f258add 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7280,6 +7280,7 @@ dependencies = [ "re_chunk", "re_chunk_store", "re_dataframe", + "re_grpc_client", "re_log", "re_log_encoding", "re_log_types", diff --git a/crates/store/re_grpc_client/src/lib.rs b/crates/store/re_grpc_client/src/lib.rs index f324862dc3a8..4014d6b974b2 100644 --- a/crates/store/re_grpc_client/src/lib.rs +++ b/crates/store/re_grpc_client/src/lib.rs @@ -38,7 +38,7 @@ use re_protos::remote_store::v0::{ /// Wrapper with a nicer error message #[derive(Debug)] -struct TonicStatusError(tonic::Status); +pub struct TonicStatusError(pub tonic::Status); impl std::fmt::Display for TonicStatusError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -65,7 +65,7 @@ impl Error for TonicStatusError { } #[derive(thiserror::Error, Debug)] -enum StreamError { +pub enum StreamError { /// Native connection error #[cfg(not(target_arch = "wasm32"))] #[error(transparent)] @@ -268,7 +268,7 @@ async fn stream_recording_async( Ok(()) } -fn store_info_from_catalog_chunk( +pub fn store_info_from_catalog_chunk( tc: &TransportChunk, recording_id: &str, ) -> Result { diff --git a/crates/store/re_log_types/src/path/entity_path_filter.rs b/crates/store/re_log_types/src/path/entity_path_filter.rs index 9f94525e9bcc..ce69f6ddd746 100644 --- a/crates/store/re_log_types/src/path/entity_path_filter.rs +++ b/crates/store/re_log_types/src/path/entity_path_filter.rs @@ -587,6 +587,12 @@ impl EntityPathFilter { // inclusion rule and didn't hit an exclusion rule. true } + + #[inline] + /// Iterate over all rules in the filter. + pub fn rules(&self) -> impl Iterator { + self.rules.iter() + } } impl EntityPathRule { diff --git a/rerun_py/Cargo.toml b/rerun_py/Cargo.toml index 5357d7e53a8d..99cdf915700a 100644 --- a/rerun_py/Cargo.toml +++ b/rerun_py/Cargo.toml @@ -39,6 +39,7 @@ remote = [ "dep:object_store", "dep:re_protos", "dep:re_ws_comms", + "dep:re_grpc_client", "dep:tokio", "dep:tokio-stream", "dep:tonic", @@ -61,6 +62,7 @@ re_build_info.workspace = true re_chunk = { workspace = true, features = ["arrow"] } re_chunk_store.workspace = true re_dataframe.workspace = true +re_grpc_client = { workspace = true, optional = true } re_log = { workspace = true, features = ["setup"] } re_log_encoding = { workspace = true } re_log_types.workspace = true diff --git a/rerun_py/rerun_bindings/rerun_bindings.pyi b/rerun_py/rerun_bindings/rerun_bindings.pyi index bd046bf15069..2420615ec2d9 100644 --- a/rerun_py/rerun_bindings/rerun_bindings.pyi +++ b/rerun_py/rerun_bindings/rerun_bindings.pyi @@ -615,7 +615,27 @@ class StorageNodeClient: """ Open a [`Recording`][rerun.dataframe.Recording] by id to use with the dataframe APIs. - This currently downloads the full recording to the local machine. + This will run queries against the remote storage node and stream the results. Faster for small + numbers of queries with small results. + + Parameters + ---------- + id : str + The id of the recording to open. + + Returns + ------- + Recording + The opened recording. + + """ + ... + + def download_recording(self, id: str) -> Recording: + """ + Download a [`Recording`][rerun.dataframe.Recording] by id to use with the dataframe APIs. + + This will download the full recording to memory and run queries against a local chunk store. Parameters ---------- diff --git a/rerun_py/src/dataframe.rs b/rerun_py/src/dataframe.rs index 3f75a459039e..563322bba0c5 100644 --- a/rerun_py/src/dataframe.rs +++ b/rerun_py/src/dataframe.rs @@ -28,6 +28,9 @@ use re_log_encoding::VersionPolicy; use re_log_types::{EntityPathFilter, ResolvedTimeRange, TimeType}; use re_sdk::{ComponentName, EntityPath, StoreId, StoreKind}; +#[cfg(feature = "remote")] +use crate::remote::PyRemoteRecording; + /// Register the `rerun.dataframe` module. pub(crate) fn register(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; @@ -427,7 +430,7 @@ impl IndexValuesLike<'_> { } } -struct ComponentLike(String); +pub struct ComponentLike(pub String); impl FromPyObject<'_> for ComponentLike { fn extract_bound(component: &Bound<'_, PyAny>) -> PyResult { @@ -570,6 +573,13 @@ pub struct PyRecording { pub(crate) cache: re_dataframe::QueryCacheHandle, } +#[derive(Clone)] +pub enum PyRecordingHandle { + Local(std::sync::Arc>), + #[cfg(feature = "remote")] + Remote(std::sync::Arc>), +} + /// A view of a recording restricted to a given index, containing a specific set of entities and components. /// /// See [`Recording.view(…)`][rerun.dataframe.Recording.view] for details on how to create a `RecordingView`. @@ -588,9 +598,9 @@ pub struct PyRecording { #[pyclass(name = "RecordingView")] #[derive(Clone)] pub struct PyRecordingView { - recording: std::sync::Arc>, + pub(crate) recording: PyRecordingHandle, - query_expression: QueryExpression, + pub(crate) query_expression: QueryExpression, } impl PyRecordingView { @@ -638,19 +648,27 @@ impl PyRecordingView { /// /// This schema will only contain the columns that are included in the view via /// the view contents. - fn schema(&self, py: Python<'_>) -> PySchema { - let borrowed = self.recording.borrow(py); - let engine = borrowed.engine(); + fn schema(&self, py: Python<'_>) -> PyResult { + match &self.recording { + PyRecordingHandle::Local(recording) => { + let borrowed: PyRef<'_, PyRecording> = recording.borrow(py); + let engine = borrowed.engine(); - let mut query_expression = self.query_expression.clone(); - query_expression.selection = None; + let mut query_expression = self.query_expression.clone(); + query_expression.selection = None; - let query_handle = engine.query(query_expression); + let query_handle = engine.query(query_expression); - let contents = query_handle.view_contents(); + let contents = query_handle.view_contents(); - PySchema { - schema: contents.to_vec(), + Ok(PySchema { + schema: contents.to_vec(), + }) + } + #[cfg(feature = "remote")] + PyRecordingHandle::Remote(_) => Err::<_, PyErr>(PyRuntimeError::new_err( + "Schema is not implemented for remote recordings yet.", + )), } } @@ -691,60 +709,72 @@ impl PyRecordingView { args: &Bound<'_, PyTuple>, columns: Option>, ) -> PyResult>> { - let borrowed = self.recording.borrow(py); - let engine = borrowed.engine(); - let mut query_expression = self.query_expression.clone(); - query_expression.selection = Self::select_args(args, columns)?; - let query_handle = engine.query(query_expression); + match &self.recording { + PyRecordingHandle::Local(recording) => { + let borrowed = recording.borrow(py); + let engine = borrowed.engine(); - // If the only contents found are static, we might need to warn the user since - // this means we won't naturally have any rows in the result. - let available_data_columns = query_handle - .view_contents() - .iter() - .filter(|c| matches!(c, ColumnDescriptor::Component(_))) - .collect::>(); - - // We only consider all contents static if there at least some columns - let all_contents_are_static = !available_data_columns.is_empty() - && available_data_columns.iter().all(|c| c.is_static()); - - // Additionally, we only want to warn if the user actually tried to select some - // of the static columns. Otherwise the fact that there are no results shouldn't - // be surprising. - let selected_data_columns = query_handle - .selected_contents() - .iter() - .map(|(_, col)| col) - .filter(|c| matches!(c, ColumnDescriptor::Component(_))) - .collect::>(); + let query_handle = engine.query(query_expression); - let any_selected_data_is_static = selected_data_columns.iter().any(|c| c.is_static()); - - if self.query_expression.using_index_values.is_none() - && all_contents_are_static - && any_selected_data_is_static - { - py_rerun_warn("RecordingView::select: tried to select static data, but no non-static contents generated an index value on this timeline. No results will be returned. Either include non-static data or consider using `select_static()` instead.")?; - } + // If the only contents found are static, we might need to warn the user since + // this means we won't naturally have any rows in the result. + let available_data_columns = query_handle + .view_contents() + .iter() + .filter(|c| matches!(c, ColumnDescriptor::Component(_))) + .collect::>(); + + // We only consider all contents static if there at least some columns + let all_contents_are_static = !available_data_columns.is_empty() + && available_data_columns.iter().all(|c| c.is_static()); + + // Additionally, we only want to warn if the user actually tried to select some + // of the static columns. Otherwise the fact that there are no results shouldn't + // be surprising. + let selected_data_columns = query_handle + .selected_contents() + .iter() + .map(|(_, col)| col) + .filter(|c| matches!(c, ColumnDescriptor::Component(_))) + .collect::>(); - let schema = query_handle.schema(); - let fields: Vec = - schema.fields.iter().map(|f| f.clone().into()).collect(); - let metadata = schema.metadata.clone().into_iter().collect(); - let schema = arrow::datatypes::Schema::new(fields).with_metadata(metadata); + let any_selected_data_is_static = + selected_data_columns.iter().any(|c| c.is_static()); - let reader = RecordBatchIterator::new( - query_handle - .into_batch_iter() - .map(|batch| batch.try_to_arrow_record_batch()), - std::sync::Arc::new(schema), - ); + if self.query_expression.using_index_values.is_none() + && all_contents_are_static + && any_selected_data_is_static + { + py_rerun_warn("RecordingView::select: tried to select static data, but no non-static contents generated an index value on this timeline. No results will be returned. Either include non-static data or consider using `select_static()` instead.")?; + } - Ok(PyArrowType(Box::new(reader))) + let schema = query_handle.schema(); + let fields: Vec = + schema.fields.iter().map(|f| f.clone().into()).collect(); + let metadata = schema.metadata.clone().into_iter().collect(); + let schema = arrow::datatypes::Schema::new(fields).with_metadata(metadata); + + let reader = RecordBatchIterator::new( + query_handle + .into_batch_iter() + .map(|batch| batch.try_to_arrow_record_batch()), + std::sync::Arc::new(schema), + ); + Ok(PyArrowType(Box::new(reader))) + } + #[cfg(feature = "remote")] + PyRecordingHandle::Remote(recording) => { + let borrowed_recording = recording.borrow(py); + let mut borrowed_client = borrowed_recording.client.borrow_mut(py); + borrowed_client.exec_query( + borrowed_recording.store_info.store_id.clone(), + query_expression, + ) + } + } } /// Select only the static columns from the view. @@ -780,54 +810,69 @@ impl PyRecordingView { args: &Bound<'_, PyTuple>, columns: Option>, ) -> PyResult>> { - let borrowed = self.recording.borrow(py); - let engine = borrowed.engine(); - let mut query_expression = self.query_expression.clone(); - // This is a static selection, so we clear the filtered index query_expression.filtered_index = None; // If no columns provided, select all static columns - let static_columns = Self::select_args(args, columns)?.unwrap_or_else(|| { - self.schema(py) - .schema - .iter() - .filter(|col| col.is_static()) - .map(|col| col.clone().into()) - .collect() - }); + let static_columns = Self::select_args(args, columns) + .transpose() + .unwrap_or_else(|| { + Ok(self + .schema(py)? + .schema + .iter() + .filter(|col| col.is_static()) + .map(|col| col.clone().into()) + .collect()) + })?; query_expression.selection = Some(static_columns); - let query_handle = engine.query(query_expression); + match &self.recording { + PyRecordingHandle::Local(recording) => { + let borrowed = recording.borrow(py); + let engine = borrowed.engine(); - let non_static_cols = query_handle - .selected_contents() - .iter() - .filter(|(_, col)| !col.is_static()) - .collect::>(); + let query_handle = engine.query(query_expression); - if !non_static_cols.is_empty() { - return Err(PyValueError::new_err(format!( - "Static selection resulted in non-static columns: {non_static_cols:?}", - ))); - } + let non_static_cols = query_handle + .selected_contents() + .iter() + .filter(|(_, col)| !col.is_static()) + .collect::>(); - let schema = query_handle.schema(); - let fields: Vec = - schema.fields.iter().map(|f| f.clone().into()).collect(); - let metadata = schema.metadata.clone().into_iter().collect(); - let schema = arrow::datatypes::Schema::new(fields).with_metadata(metadata); + if !non_static_cols.is_empty() { + return Err(PyValueError::new_err(format!( + "Static selection resulted in non-static columns: {non_static_cols:?}", + ))); + } - let reader = RecordBatchIterator::new( - query_handle - .into_batch_iter() - .map(|batch| batch.try_to_arrow_record_batch()), - std::sync::Arc::new(schema), - ); + let schema = query_handle.schema(); + let fields: Vec = + schema.fields.iter().map(|f| f.clone().into()).collect(); + let metadata = schema.metadata.clone().into_iter().collect(); + let schema = arrow::datatypes::Schema::new(fields).with_metadata(metadata); + + let reader = RecordBatchIterator::new( + query_handle + .into_batch_iter() + .map(|batch| batch.try_to_arrow_record_batch()), + std::sync::Arc::new(schema), + ); - Ok(PyArrowType(Box::new(reader))) + Ok(PyArrowType(Box::new(reader))) + } + #[cfg(feature = "remote")] + PyRecordingHandle::Remote(recording) => { + let borrowed_recording = recording.borrow(py); + let mut borrowed_client = borrowed_recording.client.borrow_mut(py); + borrowed_client.exec_query( + borrowed_recording.store_info.store_id.clone(), + query_expression, + ) + } + } } #[allow(rustdoc::private_doc_tests)] @@ -1336,7 +1381,7 @@ impl PyRecording { let recording = slf.unbind(); Ok(PyRecordingView { - recording: std::sync::Arc::new(recording), + recording: PyRecordingHandle::Local(std::sync::Arc::new(recording)), query_expression: query, }) } diff --git a/rerun_py/src/remote.rs b/rerun_py/src/remote.rs index 2619d3990088..414d116bb698 100644 --- a/rerun_py/src/remote.rs +++ b/rerun_py/src/remote.rs @@ -1,4 +1,7 @@ #![allow(unsafe_op_in_unsafe_fn)] + +use std::collections::BTreeSet; + use arrow::{ array::{RecordBatch, RecordBatchIterator, RecordBatchReader}, datatypes::Schema, @@ -6,23 +9,30 @@ use arrow::{ pyarrow::PyArrowType, }; // False positive due to #[pyfunction] macro -use pyo3::{exceptions::PyRuntimeError, prelude::*, Bound, PyResult}; +use pyo3::{ + exceptions::{PyRuntimeError, PyTypeError, PyValueError}, + prelude::*, + types::PyDict, + Bound, PyResult, +}; use re_chunk::{Chunk, TransportChunk}; use re_chunk_store::ChunkStore; -use re_dataframe::ChunkStoreHandle; +use re_dataframe::{ChunkStoreHandle, QueryExpression, SparseFillStrategy, ViewContentsSelector}; +use re_grpc_client::TonicStatusError; use re_log_encoding::codec::wire::{decode, encode}; -use re_log_types::{StoreInfo, StoreSource}; +use re_log_types::{EntityPathFilter, StoreInfo, StoreSource}; use re_protos::{ common::v0::{EncoderVersion, RecordingId}, remote_store::v0::{ - storage_node_client::StorageNodeClient, DataframePart, FetchRecordingRequest, - QueryCatalogRequest, RecordingType, RegisterRecordingRequest, UpdateCatalogRequest, + storage_node_client::StorageNodeClient, CatalogFilter, DataframePart, + FetchRecordingRequest, QueryCatalogRequest, QueryRequest, RecordingType, + RegisterRecordingRequest, UpdateCatalogRequest, }, }; -use re_sdk::{ApplicationId, StoreId, StoreKind, Time}; +use re_sdk::{ApplicationId, ComponentName, StoreId, StoreKind, Time, Timeline}; use tokio_stream::StreamExt; -use crate::dataframe::PyRecording; +use crate::dataframe::{ComponentLike, PyRecording, PyRecordingHandle, PyRecordingView}; /// Register the `rerun.remote` module. pub(crate) fn register(m: &Bound<'_, PyModule>) -> PyResult<()> { @@ -80,6 +90,106 @@ pub struct PyStorageNodeClient { client: StorageNodeClient, } +impl PyStorageNodeClient { + /// Get the [`StoreInfo`] for a single recording in the storage node. + fn get_store_info(&mut self, id: &str) -> PyResult { + let store_info = self + .runtime + .block_on(async { + let resp = self + .client + .query_catalog(QueryCatalogRequest { + column_projection: None, // fetch all columns + filter: Some(CatalogFilter { + recording_ids: vec![RecordingId { id: id.to_owned() }], + }), + }) + .await + .map_err(re_grpc_client::TonicStatusError)? + .into_inner() + .filter_map(|resp| { + resp.and_then(|r| { + decode(r.encoder_version(), &r.payload) + .map_err(|err| tonic::Status::internal(err.to_string())) + }) + .transpose() + }) + .collect::, tonic::Status>>() + .await + .map_err(re_grpc_client::TonicStatusError)?; + + if resp.len() != 1 || resp[0].num_rows() != 1 { + return Err(re_grpc_client::StreamError::ChunkError( + re_chunk::ChunkError::Malformed { + reason: format!( + "expected exactly one recording with id {id}, got {}", + resp.len() + ), + }, + )); + } + + re_grpc_client::store_info_from_catalog_chunk(&resp[0], id) + }) + .map_err(|err| PyRuntimeError::new_err(err.to_string()))?; + + Ok(store_info) + } + + /// Execute a [`QueryExpression`] for a single recording in the storage node. + pub(crate) fn exec_query( + &mut self, + id: StoreId, + query: QueryExpression, + ) -> PyResult>> { + let query: re_protos::common::v0::Query = query.into(); + + let batches = self.runtime.block_on(async { + // TODO(#8536): Avoid the need to collect here. + // This means we shouldn't be blocking on + let batches = self + .client + .query(QueryRequest { + recording_id: Some(id.into()), + query: Some(query.clone()), + }) + .await + .map_err(TonicStatusError)? + .into_inner() + .filter_map(|resp| { + resp.and_then(|r| { + decode(r.encoder_version(), &r.payload) + .map_err(|err| tonic::Status::internal(err.to_string())) + }) + .transpose() + }) + .collect::, tonic::Status>>() + .await + .map_err(TonicStatusError)?; + + let schema = batches + .first() + .map(|batch| batch.schema.clone()) + .unwrap_or_else(|| arrow2::datatypes::Schema::from(vec![])); + + let fields: Vec = + schema.fields.iter().map(|f| f.clone().into()).collect(); + let metadata = schema.metadata.clone().into_iter().collect(); + let schema = arrow::datatypes::Schema::new(fields).with_metadata(metadata); + + Ok(RecordBatchIterator::new( + batches.into_iter().map(|tc| tc.try_to_arrow_record_batch()), + std::sync::Arc::new(schema), + )) + }); + + let result = + batches.map_err(|err: TonicStatusError| PyRuntimeError::new_err(err.to_string()))?; + + Ok(PyArrowType(Box::new(result))) + } +} + #[pymethods] impl PyStorageNodeClient { /// Get the metadata for all recordings in the storage node. @@ -250,7 +360,37 @@ impl PyStorageNodeClient { /// Open a [`Recording`][rerun.dataframe.Recording] by id to use with the dataframe APIs. /// - /// This currently downloads the full recording to the local machine. + /// This will run queries against the remote storage node and stream the results. Faster for small + /// numbers of queries with small results. + /// + /// Parameters + /// ---------- + /// id : str + /// The id of the recording to open. + /// + /// Returns + /// ------- + /// Recording + /// The opened recording. + #[pyo3(signature = ( + id, + ))] + fn open_recording(slf: Bound<'_, Self>, id: &str) -> PyResult { + let mut borrowed_self = slf.borrow_mut(); + + let store_info = borrowed_self.get_store_info(id)?; + + let client = slf.unbind(); + + Ok(PyRemoteRecording { + client: std::sync::Arc::new(client), + store_info, + }) + } + + /// Download a [`Recording`][rerun.dataframe.Recording] by id to use with the dataframe APIs. + /// + /// This will download the full recording to memory and run queries against a local chunk store. /// /// Parameters /// ---------- @@ -264,7 +404,7 @@ impl PyStorageNodeClient { #[pyo3(signature = ( id, ))] - fn open_recording(&mut self, id: &str) -> PyResult { + fn download_recording(&mut self, id: &str) -> PyResult { use tokio_stream::StreamExt as _; let store = self.runtime.block_on(async { let mut resp = self @@ -347,3 +487,229 @@ impl MetadataLike { .map_err(|err| PyRuntimeError::new_err(err.to_string())) } } + +/// A single Rerun recording. +/// +/// This can be loaded from an RRD file using [`load_recording()`][rerun.dataframe.load_recording]. +/// +/// A recording is a collection of data that was logged to Rerun. This data is organized +/// as a column for each index (timeline) and each entity/component pair that was logged. +/// +/// You can examine the [`.schema()`][rerun.dataframe.Recording.schema] of the recording to see +/// what data is available, or create a [`RecordingView`][rerun.dataframe.RecordingView] to +/// to retrieve the data. +#[pyclass(name = "RemoteRecording")] +pub struct PyRemoteRecording { + pub(crate) client: std::sync::Arc>, + pub(crate) store_info: StoreInfo, +} + +impl PyRemoteRecording { + /// Convert a `ViewContentsLike` into a `ViewContentsSelector`. + /// + /// ```python + /// ViewContentsLike = Union[str, Dict[str, Union[ComponentLike, Sequence[ComponentLike]]]] + /// ``` + /// + // TODO(jleibs): This needs access to the schema to resolve paths and components + fn extract_contents_expr( + expr: &Bound<'_, PyAny>, + ) -> PyResult { + if let Ok(expr) = expr.extract::() { + let path_filter = + EntityPathFilter::parse_strict(&expr, &Default::default()).map_err(|err| { + PyValueError::new_err(format!( + "Could not interpret `contents` as a ViewContentsLike. Failed to parse {expr}: {err}.", + )) + })?; + + for (rule, _) in path_filter.rules() { + if rule.include_subtree { + return Err(PyValueError::new_err( + "SubTree path expressions (/**) are not allowed yet for remote recordings.", + )); + } + } + + // Since these are all exact rules, just include them directly + let contents = path_filter + .rules() + .map(|(rule, _)| (rule.path.clone(), None)) + .collect(); + + Ok(contents) + } else if let Ok(dict) = expr.downcast::() { + // `Union[ComponentLike, Sequence[ComponentLike]]]` + + let mut contents = ViewContentsSelector::default(); + + for (key, value) in dict { + let key = key.extract::().map_err(|_err| { + PyTypeError::new_err( + format!("Could not interpret `contents` as a ViewContentsLike. Key: {key} is not a path expression."), + ) + })?; + + let path_filter = EntityPathFilter::parse_strict(&key, &Default::default()).map_err(|err| { + PyValueError::new_err(format!( + "Could not interpret `contents` as a ViewContentsLike. Failed to parse {key}: {err}.", + )) + })?; + + for (rule, _) in path_filter.rules() { + if rule.include_subtree { + return Err(PyValueError::new_err( + "SubTree path expressions (/**) are not allowed yet for remote recordings.", + )); + } + } + + let component_strs: BTreeSet = if let Ok(component) = + value.extract::() + { + std::iter::once(component.0).collect() + } else if let Ok(components) = value.extract::>() { + components.into_iter().map(|c| c.0).collect() + } else { + return Err(PyTypeError::new_err( + format!("Could not interpret `contents` as a ViewContentsLike. Value: {value} is not a ComponentLike or Sequence[ComponentLike]."), + )); + }; + + contents.append( + &mut path_filter + .rules() + .map(|(rule, _)| { + let components = component_strs + .iter() + .map(|component_name| ComponentName::from(component_name.clone())) + .collect(); + (rule.path.clone(), Some(components)) + }) + .collect(), + ); + } + + Ok(contents) + } else { + return Err(PyTypeError::new_err( + "Could not interpret `contents` as a ViewContentsLike. Top-level type must be a string or a dictionary.", + )); + } + } +} + +#[pymethods] +impl PyRemoteRecording { + #[allow(rustdoc::private_doc_tests, rustdoc::invalid_rust_codeblocks)] + /// Create a [`RecordingView`][rerun.dataframe.RecordingView] of the recording according to a particular index and content specification. + /// + /// The only type of index currently supported is the name of a timeline. + /// + /// The view will only contain a single row for each unique value of the index + /// that is associated with a component column that was included in the view. + /// Component columns that are not included via the view contents will not + /// impact the rows that make up the view. If the same entity / component pair + /// was logged to a given index multiple times, only the most recent row will be + /// included in the view, as determined by the `row_id` column. This will + /// generally be the last value logged, as row_ids are guaranteed to be + /// monotonically increasing when data is sent from a single process. + /// + /// Parameters + /// ---------- + /// index : str + /// The index to use for the view. This is typically a timeline name. + /// contents : ViewContentsLike + /// The content specification for the view. + /// + /// This can be a single string content-expression such as: `"world/cameras/**"`, or a dictionary + /// specifying multiple content-expressions and a respective list of components to select within + /// that expression such as `{"world/cameras/**": ["ImageBuffer", "PinholeProjection"]}`. + /// include_semantically_empty_columns : bool, optional + /// Whether to include columns that are semantically empty, by default `False`. + /// + /// Semantically empty columns are components that are `null` or empty `[]` for every row in the recording. + /// include_indicator_columns : bool, optional + /// Whether to include indicator columns, by default `False`. + /// + /// Indicator columns are components used to represent the presence of an archetype within an entity. + /// include_tombstone_columns : bool, optional + /// Whether to include tombstone columns, by default `False`. + /// + /// Tombstone columns are components used to represent clears. However, even without the clear + /// tombstone columns, the view will still apply the clear semantics when resolving row contents. + /// + /// Returns + /// ------- + /// RecordingView + /// The view of the recording. + /// + /// Examples + /// -------- + /// All the data in the recording on the timeline "my_index": + /// ```python + /// recording.view(index="my_index", contents="/**") + /// ``` + /// + /// Just the Position3D components in the "points" entity: + /// ```python + /// recording.view(index="my_index", contents={"points": "Position3D"}) + /// ``` + #[allow(clippy::fn_params_excessive_bools)] + #[pyo3(signature = ( + *, + index, + contents, + include_semantically_empty_columns = false, + include_indicator_columns = false, + include_tombstone_columns = false, + ))] + fn view( + slf: Bound<'_, Self>, + index: &str, + contents: &Bound<'_, PyAny>, + include_semantically_empty_columns: bool, + include_indicator_columns: bool, + include_tombstone_columns: bool, + ) -> PyResult { + // TODO(jleibs): We should be able to use this to resolve the timeline / contents + //let borrowed_self = slf.borrow(); + + // TODO(jleibs): Need to get this from the remote schema + //let timeline = borrowed_self.store.read().resolve_time_selector(&selector); + let timeline = Timeline::new_sequence(index); + + let contents = Self::extract_contents_expr(contents)?; + + let query = QueryExpression { + view_contents: Some(contents), + include_semantically_empty_columns, + include_indicator_columns, + include_tombstone_columns, + filtered_index: Some(timeline), + filtered_index_range: None, + filtered_index_values: None, + using_index_values: None, + filtered_is_not_null: None, + sparse_fill_strategy: SparseFillStrategy::None, + selection: None, + }; + + let recording = slf.unbind(); + + Ok(PyRecordingView { + recording: PyRecordingHandle::Remote(std::sync::Arc::new(recording)), + query_expression: query, + }) + } + + /// The recording ID of the recording. + fn recording_id(&self) -> String { + self.store_info.store_id.id.as_str().to_owned() + } + + /// The application ID of the recording. + fn application_id(&self) -> String { + self.store_info.application_id.to_string() + } +}