Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expand remote APIs to allow for server-side query execution #8537

Merged
merged 8 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7280,6 +7280,7 @@ dependencies = [
"re_chunk",
"re_chunk_store",
"re_dataframe",
"re_grpc_client",
"re_log",
"re_log_encoding",
"re_log_types",
Expand Down
6 changes: 3 additions & 3 deletions crates/store/re_grpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use re_protos::remote_store::v0::{

/// Wrapper with a nicer error message
#[derive(Debug)]
struct TonicStatusError(tonic::Status);
pub struct TonicStatusError(pub tonic::Status);

impl std::fmt::Display for TonicStatusError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand All @@ -65,7 +65,7 @@ impl Error for TonicStatusError {
}

#[derive(thiserror::Error, Debug)]
enum StreamError {
pub enum StreamError {
/// Native connection error
#[cfg(not(target_arch = "wasm32"))]
#[error(transparent)]
Expand Down Expand Up @@ -268,7 +268,7 @@ async fn stream_recording_async(
Ok(())
}

fn store_info_from_catalog_chunk(
pub fn store_info_from_catalog_chunk(
tc: &TransportChunk,
recording_id: &str,
) -> Result<StoreInfo, StreamError> {
Expand Down
6 changes: 6 additions & 0 deletions crates/store/re_log_types/src/path/entity_path_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,12 @@ impl EntityPathFilter {
// inclusion rule and didn't hit an exclusion rule.
true
}

#[inline]
/// Iterate over all rules in the filter.
pub fn rules(&self) -> impl Iterator<Item = (&EntityPathRule, &RuleEffect)> {
self.rules.iter()
}
}

impl EntityPathRule {
Expand Down
2 changes: 2 additions & 0 deletions rerun_py/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ remote = [
"dep:object_store",
"dep:re_protos",
"dep:re_ws_comms",
"dep:re_grpc_client",
"dep:tokio",
"dep:tokio-stream",
"dep:tonic",
Expand All @@ -61,6 +62,7 @@ re_build_info.workspace = true
re_chunk = { workspace = true, features = ["arrow"] }
re_chunk_store.workspace = true
re_dataframe.workspace = true
re_grpc_client = { workspace = true, optional = true }
re_log = { workspace = true, features = ["setup"] }
re_log_encoding = { workspace = true }
re_log_types.workspace = true
Expand Down
22 changes: 21 additions & 1 deletion rerun_py/rerun_bindings/rerun_bindings.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,27 @@ class StorageNodeClient:
"""
Open a [`Recording`][rerun.dataframe.Recording] by id to use with the dataframe APIs.

This currently downloads the full recording to the local machine.
This will run queries against the remote storage node and stream the results. Faster for small
numbers of queries with small results.

Parameters
----------
id : str
The id of the recording to open.

Returns
-------
Recording
The opened recording.

"""
...

def download_recording(self, id: str) -> Recording:
"""
Download a [`Recording`][rerun.dataframe.Recording] by id to use with the dataframe APIs.

This will download the full recording to memory and run queries against a local chunk store.

Parameters
----------
Expand Down
87 changes: 66 additions & 21 deletions rerun_py/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ use re_log_encoding::VersionPolicy;
use re_log_types::{EntityPathFilter, ResolvedTimeRange, TimeType};
use re_sdk::{ComponentName, EntityPath, StoreId, StoreKind};

#[cfg(feature = "remote")]
use crate::remote::PyRemoteRecording;

/// Register the `rerun.dataframe` module.
pub(crate) fn register(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PySchema>()?;
Expand Down Expand Up @@ -427,7 +430,7 @@ impl IndexValuesLike<'_> {
}
}

struct ComponentLike(String);
pub struct ComponentLike(pub String);

impl FromPyObject<'_> for ComponentLike {
fn extract_bound(component: &Bound<'_, PyAny>) -> PyResult<Self> {
Expand Down Expand Up @@ -570,6 +573,13 @@ pub struct PyRecording {
pub(crate) cache: re_dataframe::QueryCacheHandle,
}

#[derive(Clone)]
pub enum PyRecordingHandle {
Local(std::sync::Arc<Py<PyRecording>>),
#[cfg(feature = "remote")]
Remote(std::sync::Arc<Py<PyRemoteRecording>>),
}

/// A view of a recording restricted to a given index, containing a specific set of entities and components.
///
/// See [`Recording.view(…)`][rerun.dataframe.Recording.view] for details on how to create a `RecordingView`.
Expand All @@ -588,9 +598,9 @@ pub struct PyRecording {
#[pyclass(name = "RecordingView")]
#[derive(Clone)]
pub struct PyRecordingView {
recording: std::sync::Arc<Py<PyRecording>>,
pub(crate) recording: PyRecordingHandle,

query_expression: QueryExpression,
pub(crate) query_expression: QueryExpression,
}

impl PyRecordingView {
Expand Down Expand Up @@ -638,8 +648,10 @@ impl PyRecordingView {
///
/// This schema will only contain the columns that are included in the view via
/// the view contents.
fn schema(&self, py: Python<'_>) -> PySchema {
let borrowed = self.recording.borrow(py);
fn schema(&self, py: Python<'_>) -> PyResult<PySchema> {
match &self.recording {
PyRecordingHandle::Local(recording) => {
let borrowed: PyRef<'_, PyRecording> = recording.borrow(py);
let engine = borrowed.engine();

let mut query_expression = self.query_expression.clone();
Expand All @@ -649,8 +661,14 @@ impl PyRecordingView {

let contents = query_handle.view_contents();

PySchema {
Ok(PySchema {
schema: contents.to_vec(),
})
}
#[cfg(feature = "remote")]
PyRecordingHandle::Remote(_) => Err::<_, PyErr>(PyRuntimeError::new_err(
"Schema is not implemented for remote recordings yet.",
)),
}
}

Expand Down Expand Up @@ -691,13 +709,14 @@ impl PyRecordingView {
args: &Bound<'_, PyTuple>,
columns: Option<Vec<AnyColumn>>,
) -> PyResult<PyArrowType<Box<dyn RecordBatchReader + Send>>> {
let borrowed = self.recording.borrow(py);
let engine = borrowed.engine();

let mut query_expression = self.query_expression.clone();

query_expression.selection = Self::select_args(args, columns)?;

match &self.recording {
PyRecordingHandle::Local(recording) => {
let borrowed = recording.borrow(py);
let engine = borrowed.engine();

let query_handle = engine.query(query_expression);

// If the only contents found are static, we might need to warn the user since
Expand All @@ -722,7 +741,8 @@ impl PyRecordingView {
.filter(|c| matches!(c, ColumnDescriptor::Component(_)))
.collect::<Vec<_>>();

let any_selected_data_is_static = selected_data_columns.iter().any(|c| c.is_static());
let any_selected_data_is_static =
selected_data_columns.iter().any(|c| c.is_static());

if self.query_expression.using_index_values.is_none()
&& all_contents_are_static
Expand All @@ -743,9 +763,19 @@ impl PyRecordingView {
.map(|batch| batch.try_to_arrow_record_batch()),
std::sync::Arc::new(schema),
);

Ok(PyArrowType(Box::new(reader)))
}
#[cfg(feature = "remote")]
PyRecordingHandle::Remote(recording) => {
let borrowed_recording = recording.borrow(py);
let mut borrowed_client = borrowed_recording.client.borrow_mut(py);
borrowed_client.exec_query(
borrowed_recording.store_info.store_id.clone(),
query_expression,
)
}
}
}

/// Select only the static columns from the view.
///
Expand Down Expand Up @@ -780,26 +810,30 @@ impl PyRecordingView {
args: &Bound<'_, PyTuple>,
columns: Option<Vec<AnyColumn>>,
) -> PyResult<PyArrowType<Box<dyn RecordBatchReader + Send>>> {
let borrowed = self.recording.borrow(py);
let engine = borrowed.engine();

let mut query_expression = self.query_expression.clone();

// This is a static selection, so we clear the filtered index
query_expression.filtered_index = None;

// If no columns provided, select all static columns
let static_columns = Self::select_args(args, columns)?.unwrap_or_else(|| {
self.schema(py)
let static_columns = Self::select_args(args, columns)
.transpose()
.unwrap_or_else(|| {
Ok(self
.schema(py)?
.schema
.iter()
.filter(|col| col.is_static())
.map(|col| col.clone().into())
.collect()
});
.collect())
})?;

query_expression.selection = Some(static_columns);

match &self.recording {
PyRecordingHandle::Local(recording) => {
let borrowed = recording.borrow(py);
let engine = borrowed.engine();

let query_handle = engine.query(query_expression);

let non_static_cols = query_handle
Expand Down Expand Up @@ -829,6 +863,17 @@ impl PyRecordingView {

Ok(PyArrowType(Box::new(reader)))
}
#[cfg(feature = "remote")]
PyRecordingHandle::Remote(recording) => {
let borrowed_recording = recording.borrow(py);
let mut borrowed_client = borrowed_recording.client.borrow_mut(py);
borrowed_client.exec_query(
borrowed_recording.store_info.store_id.clone(),
query_expression,
)
}
}
}

#[allow(rustdoc::private_doc_tests)]
/// Filter the view to only include data between the given index sequence numbers.
Expand Down Expand Up @@ -1336,7 +1381,7 @@ impl PyRecording {
let recording = slf.unbind();

Ok(PyRecordingView {
recording: std::sync::Arc::new(recording),
recording: PyRecordingHandle::Local(std::sync::Arc::new(recording)),
query_expression: query,
})
}
Expand Down
Loading
Loading