Skip to content

Commit

Permalink
review
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Aug 8, 2024
1 parent 8d4e1ef commit 55e7b1a
Showing 1 changed file with 48 additions and 17 deletions.
65 changes: 48 additions & 17 deletions crates/top/rerun/src/commands/rrd/merge_compact.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::io::{IsTerminal, Write};

use anyhow::Context as _;
use itertools::Either;

use re_chunk_store::ChunkStoreConfig;
use re_entity_db::EntityDb;
Expand All @@ -16,30 +17,42 @@ pub struct MergeCommand {
/// Paths to read from. Reads from standard input if none are specified.
path_to_input_rrds: Vec<String>,

/// Path to write to. Writes to standard output if unspecified.
#[arg(short = 'o', long = "output", value_name = "dst.(rrd|rbl)")]
path_to_output_rrd: Option<String>,

/// If set, will try to proceed even in the face of IO and/or decoding errors in the input data.
#[clap(long, default_value_t = false)]
best_effort: bool,
}

impl MergeCommand {
pub fn run(&self) -> anyhow::Result<()> {
anyhow::ensure!(
!std::io::stdout().is_terminal(),
"you must redirect the output to a file and/or stream"
);

let Self {
path_to_input_rrds,
path_to_output_rrd,
best_effort,
} = self;

if path_to_output_rrd.is_none() {
anyhow::ensure!(
!std::io::stdout().is_terminal(),
"you must redirect the output to a file and/or stream"
);
}

// NOTE #1: We're doing headless processing, there's no point in running subscribers, it will just
// (massively) slow us down.
// NOTE #2: We do not want to modify the configuration of the original data in any way
// (e.g. by recompacting it differently), so make sure to disable all these features.
let store_config = ChunkStoreConfig::ALL_DISABLED;

merge_and_compact(*best_effort, &store_config, path_to_input_rrds)
merge_and_compact(
*best_effort,
&store_config,
path_to_input_rrds,
path_to_output_rrd.as_ref(),
)
}
}

Expand All @@ -50,6 +63,10 @@ pub struct CompactCommand {
/// Paths to read from. Reads from standard input if none are specified.
path_to_input_rrds: Vec<String>,

/// Path to write to. Writes to standard output if unspecified.
#[arg(short = 'o', long = "output", value_name = "dst.(rrd|rbl)")]
path_to_output_rrd: Option<String>,

/// What is the threshold, in bytes, after which a Chunk cannot be compacted any further?
///
/// Overrides RERUN_CHUNK_MAX_BYTES if set.
Expand Down Expand Up @@ -77,19 +94,22 @@ pub struct CompactCommand {

impl CompactCommand {
pub fn run(&self) -> anyhow::Result<()> {
anyhow::ensure!(
!std::io::stdout().is_terminal(),
"you must redirect the output to a file and/or stream"
);

let Self {
path_to_input_rrds,
path_to_output_rrd,
max_bytes,
max_rows,
max_rows_if_unsorted,
best_effort,
} = self;

if path_to_output_rrd.is_none() {
anyhow::ensure!(
!std::io::stdout().is_terminal(),
"you must redirect the output to a file and/or stream"
);
}

let mut store_config = ChunkStoreConfig::from_env().unwrap_or_default();
// NOTE: We're doing headless processing, there's no point in running subscribers, it will just
// (massively) slow us down.
Expand All @@ -105,14 +125,20 @@ impl CompactCommand {
store_config.chunk_max_rows_if_unsorted = *max_rows_if_unsorted;
}

merge_and_compact(*best_effort, &store_config, path_to_input_rrds)
merge_and_compact(
*best_effort,
&store_config,
path_to_input_rrds,
path_to_output_rrd.as_ref(),
)
}
}

fn merge_and_compact(
best_effort: bool,
store_config: &ChunkStoreConfig,
path_to_input_rrds: &[String],
path_to_output_rrd: Option<&String>,
) -> anyhow::Result<()> {
let file_size_to_string = |size: Option<u64>| {
size.map_or_else(
Expand Down Expand Up @@ -170,7 +196,13 @@ fn merge_and_compact(
}
}

let mut rrd_out = std::io::BufWriter::new(std::io::stdout().lock());
let mut rrd_out = if let Some(path) = path_to_output_rrd {
Either::Left(std::io::BufWriter::new(
std::fs::File::create(path).with_context(|| format!("{path:?}"))?,
))
} else {
Either::Right(std::io::BufWriter::new(std::io::stdout().lock()))
};

let messages_rbl = entity_dbs
.values()
Expand Down Expand Up @@ -202,10 +234,9 @@ fn merge_and_compact(
rrd_out.flush().context("couldn't flush output")?;

let rrds_in_size = rx_size_bytes.recv().ok();
let compaction_ratio = if let (Some(rrds_in_size), rrd_out_size) = (rrds_in_size, rrd_out_size)
{
let size_reduction = if let (Some(rrds_in_size), rrd_out_size) = (rrds_in_size, rrd_out_size) {
format!(
"{:3.3}%",
"-{:3.3}%",
100.0 - rrd_out_size as f64 / (rrds_in_size as f64 + f64::EPSILON) * 100.0
)
} else {
Expand All @@ -215,7 +246,7 @@ fn merge_and_compact(
re_log::info!(
dst_size_bytes = %file_size_to_string(Some(rrd_out_size)),
time = ?now.elapsed(),
compaction_ratio,
size_reduction,
srcs = ?path_to_input_rrds,
srcs_size_bytes = %file_size_to_string(rrds_in_size),
"merge/compaction finished"
Expand Down

0 comments on commit 55e7b1a

Please sign in to comment.