Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load .rrd file over HTTP #1600

Merged
merged 10 commits into from
Mar 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion crates/re_smart_channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ pub enum Source {
/// The source if a file on disk
File { path: std::path::PathBuf },

/// Streaming an `.rrd` file over http.
RrdHttpStream { url: String },

/// The source is the logging sdk directly, same process.
Sdk,

Expand All @@ -33,7 +36,7 @@ impl Source {
pub fn is_network(&self) -> bool {
match self {
Self::File { .. } | Self::Sdk => false,
Self::WsClient { .. } | Self::TcpServer { .. } => true,
Self::RrdHttpStream { .. } | Self::WsClient { .. } | Self::TcpServer { .. } => true,
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions crates/re_viewer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ egui_dock = { workspace = true, features = ["serde"] }
egui_extras = { workspace = true, features = ["tracing"] }
egui-notify = "0.6"
egui-wgpu.workspace = true
ehttp = "0.2"
enumset.workspace = true
glam = { workspace = true, features = [
"mint",
Expand Down Expand Up @@ -125,6 +126,12 @@ winapi = "0.3.9"
[target.'cfg(target_arch = "wasm32")'.dependencies]
console_error_panic_hook = "0.1.6"
wasm-bindgen-futures = "0.4"
js-sys = "0.3"
wasm-bindgen = "0.2"

[dependencies.web-sys]
version = "0.3.52"
features = ["Window"]


[build-dependencies]
Expand Down
6 changes: 5 additions & 1 deletion crates/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,9 @@ fn wait_screen_ui(ui: &mut egui::Ui, rx: &Receiver<LogMsg>) {
re_smart_channel::Source::File { path } => {
ui.strong(format!("Loading {}…", path.display()));
}
re_smart_channel::Source::RrdHttpStream { url } => {
ui.strong(format!("Loading {url}…"));
}
re_smart_channel::Source::Sdk => {
ready_and_waiting(ui, "Waiting for logging data from SDK");
}
Expand Down Expand Up @@ -1781,7 +1784,8 @@ fn new_recording_confg(
re_smart_channel::Source::File { .. } => PlayState::Playing,

// Live data - follow it!
re_smart_channel::Source::Sdk
re_smart_channel::Source::RrdHttpStream { .. }
| re_smart_channel::Source::Sdk
| re_smart_channel::Source::WsClient { .. }
| re_smart_channel::Source::TcpServer { .. } => PlayState::Following,
};
Expand Down
1 change: 1 addition & 0 deletions crates/re_viewer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod env_vars;
pub mod math;
mod misc;
mod remote_viewer_app;
pub mod stream_rrd_from_http;
mod ui;
mod viewer_analytics;

Expand Down
60 changes: 35 additions & 25 deletions crates/re_viewer/src/remote_viewer_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,36 +44,41 @@ impl RemoteViewerApp {

re_log::info!("Connecting to WS server at {:?}…", self.url);

let connection =
re_ws_comms::Connection::viewer_to_server(self.url.clone(), move |binary: Vec<u8>| {
match re_ws_comms::decode_log_msg(&binary) {
Ok(log_msg) => {
if tx.send(log_msg).is_ok() {
egui_ctx.request_repaint(); // Wake up UI thread
std::ops::ControlFlow::Continue(())
} else {
re_log::info!("Failed to send log message to viewer - closing");
std::ops::ControlFlow::Break(())
}
}
Err(err) => {
re_log::error!("Failed to parse message: {}", re_error::format(&err));
let callback = move |binary: Vec<u8>| {
match re_ws_comms::decode_log_msg(&binary) {
Ok(log_msg) => {
if tx.send(log_msg).is_ok() {
egui_ctx.request_repaint(); // Wake up UI thread
std::ops::ControlFlow::Continue(())
} else {
re_log::info!("Failed to send log message to viewer - closing");
std::ops::ControlFlow::Break(())
}
}
})
.unwrap(); // TODO(emilk): handle error
Err(err) => {
re_log::error!("Failed to parse message: {}", re_error::format(&err));
std::ops::ControlFlow::Break(())
}
}
};

let app = crate::App::from_receiver(
self.build_info,
&self.app_env,
self.startup_options,
self.re_ui.clone(),
storage,
rx,
);
match re_ws_comms::Connection::viewer_to_server(self.url.clone(), callback) {
Ok(connection) => {
let app = crate::App::from_receiver(
self.build_info,
&self.app_env,
self.startup_options,
self.re_ui.clone(),
storage,
rx,
);

self.app = Some((connection, app));
self.app = Some((connection, app));
}
Err(err) => {
re_log::error!("Failed to connect to {:?}: {}", self.url, err);
}
}
}

#[cfg(not(target_arch = "wasm32"))]
Expand Down Expand Up @@ -118,6 +123,11 @@ impl eframe::App for RemoteViewerApp {

if let Some((_, app)) = &mut self.app {
app.update(egui_ctx, frame);
} else {
egui::CentralPanel::default().show(egui_ctx, |ui| {
// TODO(emilk): show the error message.
ui.label("An error occurred.\nCheck the debug console for details.");
});
}
}
}
120 changes: 120 additions & 0 deletions crates/re_viewer/src/stream_rrd_from_http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
pub fn stream_rrd_from_http_to_channel(
url: String,
) -> re_smart_channel::Receiver<re_log_types::LogMsg> {
let (tx, rx) = re_smart_channel::smart_channel(re_smart_channel::Source::RrdHttpStream {
url: url.clone(),
});
stream_rrd_from_http(
url,
Box::new(move |msg| {
tx.send(msg).ok();
}),
);
rx
}

pub fn stream_rrd_from_http(url: String, on_msg: Box<dyn Fn(re_log_types::LogMsg) + Send>) {
re_log::debug!("Downloading .rrd file from {url:?}…");

// TODO(emilk): stream the http request, progressively decoding the .rrd file.
ehttp::fetch(ehttp::Request::get(&url), move |result| match result {
Ok(response) => {
if response.ok {
re_log::debug!("Decoding .rrd file from {url:?}…");
decode_rrd(response.bytes, on_msg);
} else {
re_log::error!(
"Failed to fetch .rrd file from {url}: {} {}",
response.status,
response.status_text
);
}
}
Err(err) => {
re_log::error!("Failed to fetch .rrd file from {url}: {err}");
}
});
}

#[cfg(not(target_arch = "wasm32"))]
#[allow(clippy::needless_pass_by_value)] // must match wasm version
fn decode_rrd(rrd_bytes: Vec<u8>, on_msg: Box<dyn Fn(re_log_types::LogMsg) + Send>) {
match re_log_types::encoding::Decoder::new(rrd_bytes.as_slice()) {
Ok(decoder) => {
for msg in decoder {
match msg {
Ok(msg) => {
on_msg(msg);
}
Err(err) => {
re_log::warn_once!("Failed to decode message: {err}");
}
}
}
}
Err(err) => {
re_log::error!("Failed to decode .rrd: {err}");
}
}
}

#[cfg(target_arch = "wasm32")]
mod web_decode {
pub fn decode_rrd(rrd_bytes: Vec<u8>, on_msg: Box<dyn Fn(re_log_types::LogMsg) + Send>) {
wasm_bindgen_futures::spawn_local(decode_rrd_async(rrd_bytes, on_msg))
}

/// Decodes the file in chunks, with an yield between each chunk.
///
/// This is cooperative multi-tasking.
async fn decode_rrd_async(
rrd_bytes: Vec<u8>,
on_msg: Box<dyn Fn(re_log_types::LogMsg) + Send>,
) {
let mut last_yield = instant::Instant::now();

match re_log_types::encoding::Decoder::new(rrd_bytes.as_slice()) {
Ok(decoder) => {
for msg in decoder {
match msg {
Ok(msg) => {
on_msg(msg);
}
Err(err) => {
re_log::warn_once!("Failed to decode message: {err}");
}
}

if last_yield.elapsed() > instant::Duration::from_millis(10) {
// yield to the ui task
yield_().await;
last_yield = instant::Instant::now();
}
}
}
Err(err) => {
re_log::error!("Failed to decode .rrd: {err}");
}
}
}

// Yield to other tasks
async fn yield_() {
sleep_ms(1).await; // TODO(emilk): create a better async yield function
}

// Hack to get async sleep on wasm
async fn sleep_ms(millis: i32) {
let mut cb = |resolve: js_sys::Function, _reject: js_sys::Function| {
web_sys::window()
.unwrap()
.set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, millis)
.expect("Failed to call set_timeout");
};
let p = js_sys::Promise::new(&mut cb);
wasm_bindgen_futures::JsFuture::from(p).await.unwrap();
}
}

#[cfg(target_arch = "wasm32")]
use web_decode::decode_rrd;
3 changes: 2 additions & 1 deletion crates/re_viewer/src/viewer_analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ impl ViewerAnalytics {
if let Some(data_source) = &log_db.data_source {
let data_source = match data_source {
re_smart_channel::Source::File { .. } => "file", // .rrd
re_smart_channel::Source::Sdk => "sdk", // show()
re_smart_channel::Source::RrdHttpStream { .. } => "http",
re_smart_channel::Source::Sdk => "sdk", // show()
re_smart_channel::Source::WsClient { .. } => "ws_client", // spawn()
re_smart_channel::Source::TcpServer { .. } => "tcp_server", // connect()
};
Expand Down
Loading