Skip to content

Commit

Permalink
rrd merge|compact: impl stdout support
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Aug 8, 2024
1 parent 3bb63b0 commit 8d4e1ef
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 69 deletions.
85 changes: 29 additions & 56 deletions crates/top/rerun/src/commands/rrd/merge_compact.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::path::PathBuf;
use std::io::{IsTerminal, Write};

use anyhow::Context as _;

Expand All @@ -16,19 +16,20 @@ pub struct MergeCommand {
/// Paths to read from. Reads from standard input if none are specified.
path_to_input_rrds: Vec<String>,

#[arg(short = 'o', long = "output", value_name = "dst.(rrd|rbl)")]
path_to_output_rrd: String,

/// If set, will try to proceed even in the face of IO and/or decoding errors in the input data.
#[clap(long, default_value_t = false)]
best_effort: bool,
}

impl MergeCommand {
pub fn run(&self) -> anyhow::Result<()> {
anyhow::ensure!(
!std::io::stdout().is_terminal(),
"you must redirect the output to a file and/or stream"
);

let Self {
path_to_input_rrds,
path_to_output_rrd,
best_effort,
} = self;

Expand All @@ -38,12 +39,7 @@ impl MergeCommand {
// (e.g. by recompacting it differently), so make sure to disable all these features.
let store_config = ChunkStoreConfig::ALL_DISABLED;

merge_and_compact(
*best_effort,
&store_config,
path_to_input_rrds,
path_to_output_rrd,
)
merge_and_compact(*best_effort, &store_config, path_to_input_rrds)
}
}

Expand All @@ -54,9 +50,6 @@ pub struct CompactCommand {
/// Paths to read from. Reads from standard input if none are specified.
path_to_input_rrds: Vec<String>,

#[arg(short = 'o', long = "output", value_name = "dst.(rrd|rbl)")]
path_to_output_rrd: String,

/// What is the threshold, in bytes, after which a Chunk cannot be compacted any further?
///
/// Overrides RERUN_CHUNK_MAX_BYTES if set.
Expand Down Expand Up @@ -84,9 +77,13 @@ pub struct CompactCommand {

impl CompactCommand {
pub fn run(&self) -> anyhow::Result<()> {
anyhow::ensure!(
!std::io::stdout().is_terminal(),
"you must redirect the output to a file and/or stream"
);

let Self {
path_to_input_rrds,
path_to_output_rrd,
max_bytes,
max_rows,
max_rows_if_unsorted,
Expand All @@ -108,39 +105,15 @@ impl CompactCommand {
store_config.chunk_max_rows_if_unsorted = *max_rows_if_unsorted;
}

merge_and_compact(
*best_effort,
&store_config,
path_to_input_rrds,
path_to_output_rrd,
)
merge_and_compact(*best_effort, &store_config, path_to_input_rrds)
}
}

fn merge_and_compact(
best_effort: bool,
store_config: &ChunkStoreConfig,
path_to_input_rrds: &[String],
path_to_output_rrd: &str,
) -> anyhow::Result<()> {
let path_to_output_rrd = PathBuf::from(path_to_output_rrd);

let rrds_in_size = {
let rrds_in: Result<Vec<_>, _> = path_to_input_rrds
.iter()
.map(|path_to_input_rrd| {
std::fs::File::open(path_to_input_rrd)
.with_context(|| format!("{path_to_input_rrd:?}"))
})
.collect();
rrds_in.ok().and_then(|rrds_in| {
rrds_in
.iter()
.map(|rrd_in| rrd_in.metadata().ok().map(|md| md.len()))
.sum::<Option<u64>>()
})
};

let file_size_to_string = |size: Option<u64>| {
size.map_or_else(
|| "<unknown>".to_owned(),
Expand All @@ -159,7 +132,8 @@ fn merge_and_compact(

// TODO(cmc): might want to make this configurable at some point.
let version_policy = re_log_encoding::decoder::VersionPolicy::Warn;
let rx = read_rrd_streams_from_file_or_stdin(version_policy, path_to_input_rrds);
let (rx, rx_size_bytes) =
read_rrd_streams_from_file_or_stdin(version_policy, path_to_input_rrds);

let mut entity_dbs: std::collections::HashMap<StoreId, EntityDb> = Default::default();

Expand Down Expand Up @@ -196,8 +170,7 @@ fn merge_and_compact(
}
}

let mut rrd_out = std::fs::File::create(&path_to_output_rrd)
.with_context(|| format!("{path_to_output_rrd:?}"))?;
let mut rrd_out = std::io::BufWriter::new(std::io::stdout().lock());

let messages_rbl = entity_dbs
.values()
Expand All @@ -216,7 +189,7 @@ fn merge_and_compact(
.and_then(|db| db.store_info())
.and_then(|info| info.store_version)
.unwrap_or(re_build_info::CrateVersion::LOCAL);
re_log_encoding::encoder::encode(
let rrd_out_size = re_log_encoding::encoder::encode(
version,
encoding_options,
// NOTE: We want to make sure all blueprints come first, so that the viewer can immediately
Expand All @@ -226,21 +199,21 @@ fn merge_and_compact(
)
.context("couldn't encode messages")?;

let rrd_out_size = rrd_out.metadata().ok().map(|md| md.len());
rrd_out.flush().context("couldn't flush output")?;

let compaction_ratio =
if let (Some(rrds_in_size), Some(rrd_out_size)) = (rrds_in_size, rrd_out_size) {
format!(
"{:3.3}%",
100.0 - rrd_out_size as f64 / (rrds_in_size as f64 + f64::EPSILON) * 100.0
)
} else {
"N/A".to_owned()
};
let rrds_in_size = rx_size_bytes.recv().ok();
let compaction_ratio = if let (Some(rrds_in_size), rrd_out_size) = (rrds_in_size, rrd_out_size)
{
format!(
"{:3.3}%",
100.0 - rrd_out_size as f64 / (rrds_in_size as f64 + f64::EPSILON) * 100.0
)
} else {
"N/A".to_owned()
};

re_log::info!(
dst = ?path_to_output_rrd,
dst_size_bytes = %file_size_to_string(rrd_out_size),
dst_size_bytes = %file_size_to_string(Some(rrd_out_size)),
time = ?now.elapsed(),
compaction_ratio,
srcs = ?path_to_input_rrds,
Expand Down
8 changes: 4 additions & 4 deletions crates/top/rerun/src/commands/rrd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub enum RrdCommands {
/// Example: `rerun rrd print /my/recordings/*.rrd`
Print(PrintCommand),

/// Compacts the contents of one or more .rrd/.rbl files/streams and writes the result to a new file.
/// Compacts the contents of one or more .rrd/.rbl files/streams and writes the result standard output.
///
/// Reads from standard input if no paths are specified.
///
Expand All @@ -41,16 +41,16 @@ pub enum RrdCommands {
///
/// * `RERUN_CHUNK_MAX_ROWS=4096 RERUN_CHUNK_MAX_BYTES=1048576 rerun rrd compact /my/recordings/*.rrd -o output.rrd`
///
/// * `rerun rrd compact --max-rows 4096 --max-bytes=1048576 /my/recordings/*.rrd -o output.rrd`
/// * `rerun rrd compact --max-rows 4096 --max-bytes=1048576 /my/recordings/*.rrd > output.rrd`
Compact(CompactCommand),

/// Merges the contents of multiple .rrd/.rbl files/streams, and writes the result to a new file.
/// Merges the contents of multiple .rrd/.rbl files/streams, and writes the result to standard output.
///
/// Reads from standard input if no paths are specified.
///
/// This will not affect the chunking of the data in any way.
///
/// Example: `rerun merge /my/recordings/*.rrd -o output.rrd`
/// Example: `rerun merge /my/recordings/*.rrd > output.rrd`
Merge(MergeCommand),
}

Expand Down
2 changes: 1 addition & 1 deletion crates/top/rerun/src/commands/rrd/print.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl PrintCommand {

// TODO(cmc): might want to make this configurable at some point.
let version_policy = re_log_encoding::decoder::VersionPolicy::Warn;
let rx = read_rrd_streams_from_file_or_stdin(version_policy, path_to_input_rrds);
let (rx, _) = read_rrd_streams_from_file_or_stdin(version_policy, path_to_input_rrds);

for res in rx {
let mut is_success = true;
Expand Down
31 changes: 23 additions & 8 deletions crates/top/rerun/src/commands/stdio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ use re_log_types::LogMsg;
/// Asynchronously decodes potentially multiplexed RRD streams from the given `paths`, or standard
/// input if none are specified.
///
/// The returned channel contains both the successfully decoded data, if any, as well as any
/// errors faced during processing.
/// This function returns 2 channels:
/// * The first channel contains both the successfully decoded data, if any, as well as any
/// errors faced during processing.
/// * The second channel, which will fire only once, after all processing is done, indicates the
/// total number of bytes processed.
///
/// This function is best-effort: it will try to make progress even in the face of errors.
/// It is up to the user to decide whether and when to stop.
Expand All @@ -22,21 +25,27 @@ use re_log_types::LogMsg;
pub fn read_rrd_streams_from_file_or_stdin(
version_policy: re_log_encoding::decoder::VersionPolicy,
paths: &[String],
) -> channel::Receiver<anyhow::Result<LogMsg>> {
) -> (
channel::Receiver<anyhow::Result<LogMsg>>,
channel::Receiver<u64>,
) {
let path_to_input_rrds = paths.iter().map(PathBuf::from).collect_vec();

// TODO(cmc): might want to make this configurable at some point.
let (tx, rx) = crossbeam::channel::bounded(100);
let (tx_size_bytes, rx_size_bytes) = crossbeam::channel::bounded(1);

_ = std::thread::Builder::new()
.name("rerun-cli-stdin".to_owned())
.spawn(move || {
let mut size_bytes = 0;

if path_to_input_rrds.is_empty() {
// stdin

let stdin = std::io::BufReader::new(std::io::stdin().lock());

let decoder = match re_log_encoding::decoder::Decoder::new_concatenated(
let mut decoder = match re_log_encoding::decoder::Decoder::new_concatenated(
version_policy,
stdin,
)
Expand All @@ -49,10 +58,12 @@ pub fn read_rrd_streams_from_file_or_stdin(
}
};

for res in decoder {
for res in &mut decoder {
let res = res.context("couldn't decode message from stdin -- skipping");
tx.send(res).ok();
}

size_bytes += decoder.size_bytes();
} else {
// file(s)

Expand All @@ -67,7 +78,7 @@ pub fn read_rrd_streams_from_file_or_stdin(
}
};

let decoder =
let mut decoder =
match re_log_encoding::decoder::Decoder::new(version_policy, rrd_file)
.with_context(|| format!("couldn't decode {rrd_path:?} -- skipping"))
{
Expand All @@ -78,15 +89,19 @@ pub fn read_rrd_streams_from_file_or_stdin(
}
};

for res in decoder {
for res in &mut decoder {
let res = res.context("decode rrd message").with_context(|| {
format!("couldn't decode message {rrd_path:?} -- skipping")
});
tx.send(res).ok();
}

size_bytes += decoder.size_bytes();
}
}

tx_size_bytes.send(size_bytes).ok();
});

rx
(rx, rx_size_bytes)
}

0 comments on commit 8d4e1ef

Please sign in to comment.