From e26366234fdf555d286fc9896fc43a339293a579 Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Thu, 23 May 2024 10:04:43 +0100 Subject: [PATCH] Introduce bifrost benchpress We introduce bifrost benchpress, a binary that's used to run simulation perf tests focused on bifrost loglets performance. Initial code contains tests to reliably measure local loglet's critical-path single log append latency and write-to-read latency with a given configuration. The benchmark utility is built to give full control over how the benchmark runs, in contrast to criterion which is designed for micro benches. --- Cargo.lock | 66 ++++++- Cargo.toml | 1 + crates/bifrost/src/error.rs | 4 +- crates/types/src/config/admin.rs | 12 -- crates/types/src/config/bifrost.rs | 12 -- crates/types/src/config/common.rs | 6 + crates/types/src/config/mod.rs | 27 ++- crates/types/src/config/worker.rs | 16 +- server/Cargo.toml | 1 - tools/bifrost-benchpress/Cargo.toml | 50 ++++++ tools/bifrost-benchpress/README.md | 8 + .../bifrost-benchpress/src/append_latency.rs | 58 ++++++ tools/bifrost-benchpress/src/lib.rs | 58 ++++++ tools/bifrost-benchpress/src/main.rs | 170 ++++++++++++++++++ tools/bifrost-benchpress/src/read_to_write.rs | 117 ++++++++++++ tools/bifrost-benchpress/src/util.rs | 111 ++++++++++++ 16 files changed, 665 insertions(+), 52 deletions(-) create mode 100644 tools/bifrost-benchpress/Cargo.toml create mode 100644 tools/bifrost-benchpress/README.md create mode 100644 tools/bifrost-benchpress/src/append_latency.rs create mode 100644 tools/bifrost-benchpress/src/lib.rs create mode 100644 tools/bifrost-benchpress/src/main.rs create mode 100644 tools/bifrost-benchpress/src/read_to_write.rs create mode 100644 tools/bifrost-benchpress/src/util.rs diff --git a/Cargo.lock b/Cargo.lock index e22ce20a39..c82f2e230c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1209,6 +1209,49 @@ dependencies = [ "smallvec", ] +[[package]] +name = "bifrost-benchpress" +version = "0.9.1" +dependencies = [ + "anyhow", + "bytes", + "bytestring", + "clap", + "codederror", + "figment", + "futures", + "hdrhistogram", + "metrics", + "metrics-exporter-prometheus 0.14.0", + "once_cell", + "quanta", + "restate-bifrost", + "restate-core", + "restate-errors", + "restate-metadata-store", + "restate-rocksdb", + "restate-server", + "restate-test-util", + "restate-tracing-instrumentation", + "restate-types", + "rocksdb", + "serde", + "serde_json", + "serde_with", + "smallvec", + "static_assertions", + "strum 0.26.2", + "strum_macros 0.26.2", + "tempfile", + "thiserror", + "tikv-jemallocator", + "tokio", + "tokio-stream", + "toml", + "tracing", + "tracing-subscriber", +] + [[package]] name = "bincode" version = "2.0.0-rc.3" @@ -2959,6 +3002,7 @@ checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d" dependencies = [ "base64 0.21.7", "byteorder", + "crossbeam-channel", "flate2", "nom", "num-traits", @@ -3784,6 +3828,22 @@ dependencies = [ "tokio", ] +[[package]] +name = "metrics-exporter-prometheus" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d58e362dc7206e9456ddbcdbd53c71ba441020e62104703075a69151e38d85f" +dependencies = [ + "base64 0.22.0", + "hyper-util", + "indexmap 2.2.6", + "metrics", + "metrics-util", + "quanta", + "thiserror", + "tokio", +] + [[package]] name = "metrics-tracing-context" version = "0.15.0" @@ -4911,9 +4971,9 @@ dependencies = [ [[package]] name = "quanta" -version = "0.12.2" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ca0b7bac0b97248c40bb77288fc52029cf1459c0461ea1b05ee32ccf011de2c" +checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" dependencies = [ "crossbeam-utils", "libc", @@ -5721,7 +5781,7 @@ dependencies = [ "humantime", "hyper 0.14.28", "metrics", - "metrics-exporter-prometheus", + "metrics-exporter-prometheus 0.13.1", "metrics-tracing-context", "metrics-util", "once_cell", diff --git a/Cargo.toml b/Cargo.toml index e9d24b96cb..62b8e97845 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "server", "tools/service-protocol-wireshark-dissector", "tools/xtask", + "tools/bifrost-benchpress", ] default-members = [ "cli", diff --git a/crates/bifrost/src/error.rs b/crates/bifrost/src/error.rs index a96ac3c526..2076e16336 100644 --- a/crates/bifrost/src/error.rs +++ b/crates/bifrost/src/error.rs @@ -21,9 +21,9 @@ use crate::types::SealReason; pub enum Error { #[error("log '{0}' is sealed")] LogSealed(LogId, SealReason), - #[error("unknown log '{0}")] + #[error("unknown log '{0}'")] UnknownLogId(LogId), - #[error("invalid log sequence number '{0}")] + #[error("invalid log sequence number '{0}'")] InvalidLsn(Lsn), #[error("operation failed due to an ongoing shutdown")] Shutdown(#[from] ShutdownError), diff --git a/crates/types/src/config/admin.rs b/crates/types/src/config/admin.rs index 94bbf3688b..3f1e5c0788 100644 --- a/crates/types/src/config/admin.rs +++ b/crates/types/src/config/admin.rs @@ -37,10 +37,6 @@ pub struct AdminOptions { concurrent_api_requests_limit: Option, pub query_engine: QueryEngineOptions, - #[cfg(any(test, feature = "test-util"))] - #[serde(skip, default = "super::default_arc_tmp")] - data_dir: std::sync::Arc, - /// # Controller heartbeats /// /// Controls the interval at which cluster controller polls nodes of the cluster. @@ -50,16 +46,10 @@ pub struct AdminOptions { } impl AdminOptions { - #[cfg(not(any(test, feature = "test-util")))] pub fn data_dir(&self) -> PathBuf { super::data_dir("registry") } - #[cfg(any(test, feature = "test-util"))] - pub fn data_dir(&self) -> PathBuf { - self.data_dir.path().join("registry") - } - pub fn concurrent_api_requests_limit(&self) -> usize { std::cmp::min( self.concurrent_api_requests_limit @@ -77,8 +67,6 @@ impl Default for AdminOptions { // max is limited by Tower's LoadShedLayer. concurrent_api_requests_limit: None, query_engine: Default::default(), - #[cfg(any(test, feature = "test-util"))] - data_dir: super::default_arc_tmp(), heartbeat_interval: Duration::from_millis(1500).into(), } } diff --git a/crates/types/src/config/bifrost.rs b/crates/types/src/config/bifrost.rs index 3cd65002f3..5466886447 100644 --- a/crates/types/src/config/bifrost.rs +++ b/crates/types/src/config/bifrost.rs @@ -75,22 +75,12 @@ pub struct LocalLogletOptions { /// /// Default: True. pub batch_wal_flushes: bool, - - #[cfg(any(test, feature = "test-util"))] - #[serde(skip, default = "super::default_arc_tmp")] - data_dir: std::sync::Arc, } impl LocalLogletOptions { - #[cfg(not(any(test, feature = "test-util")))] pub fn data_dir(&self) -> PathBuf { super::data_dir("local-loglet") } - - #[cfg(any(test, feature = "test-util"))] - pub fn data_dir(&self) -> PathBuf { - self.data_dir.path().join("local-loglet") - } } impl Default for LocalLogletOptions { @@ -106,8 +96,6 @@ impl Default for LocalLogletOptions { sync_wal_before_ack: true, writer_batch_commit_count: 500, writer_batch_commit_duration: Duration::ZERO.into(), - #[cfg(any(test, feature = "test-util"))] - data_dir: super::default_arc_tmp(), } } } diff --git a/crates/types/src/config/common.rs b/crates/types/src/config/common.rs index 998302bb2d..3942c4acb0 100644 --- a/crates/types/src/config/common.rs +++ b/crates/types/src/config/common.rs @@ -225,6 +225,12 @@ impl CommonOptions { pub fn cluster_name(&self) -> &str { &self.cluster_name } + + #[cfg(feature = "test-util")] + pub fn set_base_dir(&mut self, path: PathBuf) { + self.base_dir = Some(path); + } + pub fn base_dir(&self) -> PathBuf { self.base_dir.clone().unwrap_or_else(|| { std::env::current_dir() diff --git a/crates/types/src/config/mod.rs b/crates/types/src/config/mod.rs index ccfc6a0166..1bf7123767 100644 --- a/crates/types/src/config/mod.rs +++ b/crates/types/src/config/mod.rs @@ -52,8 +52,9 @@ use crate::errors::GenericError; use crate::nodes_config::Role; #[cfg(any(test, feature = "test-util"))] -pub(crate) fn default_arc_tmp() -> std::sync::Arc { - std::sync::Arc::new(tempfile::TempDir::new().unwrap()) +enum TempOrPath { + Temp(tempfile::TempDir), + Path(PathBuf), } static CONFIGURATION: Lazy>> = Lazy::new(Arc::default); @@ -61,8 +62,8 @@ static CONFIGURATION: Lazy>> = Lazy::new(Arc::default static NODE_BASE_DIR: std::sync::OnceLock = std::sync::OnceLock::new(); #[cfg(any(test, feature = "test-util"))] -static NODE_BASE_DIR: Lazy> = - Lazy::new(|| std::sync::RwLock::new(tempfile::TempDir::new().unwrap())); +static NODE_BASE_DIR: Lazy> = + Lazy::new(|| std::sync::RwLock::new(TempOrPath::Temp(tempfile::TempDir::new().unwrap()))); pub type UpdateableConfiguration = Arc>; @@ -77,7 +78,10 @@ fn data_dir(dir: &str) -> PathBuf { #[cfg(any(test, feature = "test-util"))] pub fn data_dir(dir: &str) -> PathBuf { let guard = NODE_BASE_DIR.read().unwrap(); - guard.path().join(dir) + match &*guard { + TempOrPath::Temp(temp) => temp.path().join(dir), + TempOrPath::Path(path) => path.join(dir), + } } pub fn node_filepath(filename: &str) -> PathBuf { @@ -88,8 +92,17 @@ pub fn node_filepath(filename: &str) -> PathBuf { pub fn reset_base_temp_dir() -> PathBuf { let mut guard = NODE_BASE_DIR.write().unwrap(); let new = tempfile::TempDir::new().unwrap(); - *guard = new; - PathBuf::from(guard.path()) + let path = PathBuf::from(new.path()); + *guard = TempOrPath::Temp(new); + path +} + +#[cfg(any(test, feature = "test-util"))] +pub fn reset_base_temp_dir_and_retain() -> PathBuf { + let mut guard = NODE_BASE_DIR.write().unwrap(); + let path = tempfile::TempDir::new().unwrap().into_path(); + *guard = TempOrPath::Path(path.clone()); + path } /// Set the current configuration, this is temporary until we have a dedicated configuration loader diff --git a/crates/types/src/config/worker.rs b/crates/types/src/config/worker.rs index 865ff12fd1..8a2afdda74 100644 --- a/crates/types/src/config/worker.rs +++ b/crates/types/src/config/worker.rs @@ -196,22 +196,12 @@ impl Default for InvokerOptions { pub struct StorageOptions { #[serde(flatten)] pub rocksdb: RocksDbOptions, - - #[cfg(any(test, feature = "test-util"))] - #[serde(skip, default = "super::default_arc_tmp")] - data_dir: std::sync::Arc, } impl StorageOptions { - #[cfg(not(any(test, feature = "test-util")))] pub fn data_dir(&self) -> PathBuf { super::data_dir("db") } - - #[cfg(any(test, feature = "test-util"))] - pub fn data_dir(&self) -> PathBuf { - self.data_dir.path().join("db") - } } impl Default for StorageOptions { @@ -221,10 +211,6 @@ impl Default for StorageOptions { .build() .expect("valid RocksDbOptions"); - StorageOptions { - rocksdb, - #[cfg(any(test, feature = "test-util"))] - data_dir: super::default_arc_tmp(), - } + StorageOptions { rocksdb } } } diff --git a/server/Cargo.toml b/server/Cargo.toml index a24227ce79..61f502f438 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -32,7 +32,6 @@ restate-errors = { workspace = true, features = ["include_doc"] } restate-fs-util = { workspace = true } restate-node = { workspace = true } restate-rocksdb = { workspace = true } -#restate-partition-store = { workspace = true } restate-tracing-instrumentation = { workspace = true, features = ["rt-tokio"] } restate-types = { workspace = true, features = ["clap"] } restate-worker = { workspace = true } diff --git a/tools/bifrost-benchpress/Cargo.toml b/tools/bifrost-benchpress/Cargo.toml new file mode 100644 index 0000000000..94ca5609d4 --- /dev/null +++ b/tools/bifrost-benchpress/Cargo.toml @@ -0,0 +1,50 @@ +[package] +name = "bifrost-benchpress" +version.workspace = true +authors.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +publish = false + +[dependencies] +restate-bifrost = { workspace = true, features = ["test-util"] } +restate-core = { workspace = true, features = ["test-util"] } +restate-errors = { workspace = true } +restate-metadata-store = { workspace = true } +restate-rocksdb = { workspace = true } +restate-server = { workspace = true } +restate-test-util = { workspace = true } +restate-tracing-instrumentation = { workspace = true, features = ["console-subscriber"] } +restate-types = { workspace = true, features = ["test-util"] } + +anyhow = { workspace = true } +bytes = { workspace = true } +bytestring = { workspace = true, features = ["serde"] } +clap = { workspace = true, features = ["derive", "env", "color", "help", "wrap_help", "usage", "suggestions", "error-context", "std"] } +codederror = { workspace = true } +figment = { version = "0.10.8", features = ["env", "toml"] } +futures = { workspace = true } +hdrhistogram = { version = "7.5.4" } +metrics = { workspace = true } +metrics-exporter-prometheus = { version = "0.14", default-features = false, features = ["async-runtime"] } +once_cell = { workspace = true } +quanta = { version = "0.12.3" } +rocksdb = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +serde_with = { workspace = true } +smallvec = { workspace = true } +static_assertions = { workspace = true } +strum = { workspace = true } +strum_macros = { workspace = true } +tempfile = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true, features = ["test-util"] } +tokio-stream = { workspace = true } +toml = { version = "0.8" } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } + +[target.'cfg(not(target_env = "msvc"))'.dependencies] +tikv-jemallocator = { workspace = true, features = ["unprefixed_malloc_on_supported_platforms"] } diff --git a/tools/bifrost-benchpress/README.md b/tools/bifrost-benchpress/README.md new file mode 100644 index 0000000000..41253fd81d --- /dev/null +++ b/tools/bifrost-benchpress/README.md @@ -0,0 +1,8 @@ +# Bifrost Bench Press + +A simple tool for benchmarking bifrost loglet implementations + +### How to run? +```sh +RUST_LOG=info cargo run --profile=bench --bin bifrost-benchpress -- --config-file=restate.toml --retain-test-dir write-to-read +``` diff --git a/tools/bifrost-benchpress/src/append_latency.rs b/tools/bifrost-benchpress/src/append_latency.rs new file mode 100644 index 0000000000..bbd11dc7dd --- /dev/null +++ b/tools/bifrost-benchpress/src/append_latency.rs @@ -0,0 +1,58 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::time::Instant; + +use bytes::{BufMut, BytesMut}; +use hdrhistogram::Histogram; +use tracing::info; + +use restate_bifrost::Bifrost; +use restate_core::TaskCenter; +use restate_types::logs::{LogId, Payload}; + +use crate::util::print_latencies; +use crate::Arguments; + +#[derive(Debug, Clone, clap::Parser)] +pub struct AppendLatencyOpts { + #[arg(long, default_value = "5000")] + pub num_records: u64, +} + +pub async fn run( + _common_args: &Arguments, + opts: &AppendLatencyOpts, + _tc: TaskCenter, + mut bifrost: Bifrost, +) -> anyhow::Result<()> { + let log_id = LogId::from(0); + let mut bytes = BytesMut::default(); + let raw_data = [1u8; 1024]; + bytes.put_slice(&raw_data); + let mut append_latencies = Histogram::::new(3)?; + let mut counter = 0; + loop { + if counter >= opts.num_records { + break; + } + counter += 1; + let start = Instant::now(); + let payload = Payload::new(bytes.clone()); + let _ = bifrost.append(log_id, payload).await?; + append_latencies.record(start.elapsed().as_nanos() as u64)?; + if counter % 1000 == 0 { + info!("Appended {} records", counter); + } + } + println!("Total records written: {}", counter); + print_latencies("append latency", append_latencies); + Ok(()) +} diff --git a/tools/bifrost-benchpress/src/lib.rs b/tools/bifrost-benchpress/src/lib.rs new file mode 100644 index 0000000000..d1642847a5 --- /dev/null +++ b/tools/bifrost-benchpress/src/lib.rs @@ -0,0 +1,58 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::path::PathBuf; + +use restate_types::config::CommonOptionCliOverride; + +use self::append_latency::AppendLatencyOpts; +use self::read_to_write::WriteToReadOpts; + +pub mod append_latency; +pub mod read_to_write; +pub mod util; + +#[derive(Debug, Clone, clap::Parser)] +#[command(author, version, about)] +pub struct Arguments { + /// Set a configuration file to use for Restate. + /// For more details, check the documentation. + #[arg( + short, + long = "config-file", + env = "RESTATE_CONFIG", + value_name = "FILE" + )] + pub config_file: Option, + + #[arg(long)] + pub no_prometheus_stats: bool, + + #[arg(long)] + pub no_rocksdb_stats: bool, + + #[arg(long)] + pub retain_test_dir: bool, + + #[clap(flatten)] + pub opts_overrides: CommonOptionCliOverride, + + /// Choose the benchmark to run + #[clap(subcommand)] + pub command: Command, +} + +#[derive(Debug, Clone, clap::Parser)] +pub enum Command { + /// Measures the write-to-read latency for a single log + WriteToRead(WriteToReadOpts), + /// Measures the append latency for a single log + AppendLatency(AppendLatencyOpts), +} diff --git a/tools/bifrost-benchpress/src/main.rs b/tools/bifrost-benchpress/src/main.rs new file mode 100644 index 0000000000..93d99c026b --- /dev/null +++ b/tools/bifrost-benchpress/src/main.rs @@ -0,0 +1,170 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::time::Duration; + +use bifrost_benchpress::util::{print_prometheus_stats, print_rocksdb_stats}; +use clap::Parser; +use codederror::CodedError; +use tracing::trace; + +use bifrost_benchpress::{append_latency, read_to_write, Arguments, Command}; +use metrics_exporter_prometheus::PrometheusBuilder; +use restate_bifrost::{Bifrost, BifrostService}; +use restate_core::{ + spawn_metadata_manager, MetadataManager, MockNetworkSender, TaskCenter, TaskCenterBuilder, +}; +use restate_errors::fmt::RestateCode; +use restate_metadata_store::{MetadataStoreClient, Precondition}; +use restate_rocksdb::RocksDbManager; +use restate_server::config_loader::ConfigLoaderBuilder; +use restate_tracing_instrumentation::init_tracing_and_logging; +use restate_types::arc_util::Constant; +use restate_types::config::{reset_base_temp_dir, reset_base_temp_dir_and_retain, Configuration}; +use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY; + +// Configure jemalloc similar to mimic restate server +#[cfg(not(target_env = "msvc"))] +use tikv_jemallocator::Jemalloc; + +#[cfg(not(target_env = "msvc"))] +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; + +fn main() -> anyhow::Result<()> { + let cli_args = Arguments::parse(); + let tmp_base = if cli_args.retain_test_dir { + reset_base_temp_dir_and_retain() + } else { + reset_base_temp_dir() + }; + + // We capture the absolute path of the config file on startup before we change the current + // working directory (base-dir arg) + let config_path = cli_args + .config_file + .as_ref() + .map(|p| std::fs::canonicalize(p).expect("config-file path is valid")); + + // Initial configuration loading + let config_loader = ConfigLoaderBuilder::default() + .load_env(true) + .path(config_path.clone()) + .cli_override(cli_args.opts_overrides.clone()) + .build() + .unwrap(); + + let mut config = match config_loader.load_once() { + Ok(c) => c, + Err(e) => { + // We cannot use tracing here as it's not configured yet + eprintln!("{}", e.decorate()); + eprintln!("{:#?}", RestateCode::from(&e)); + std::process::exit(1); + } + }; + + // just in case anything reads directly the base dir and it's not set correctly for any random + // reason. + config.common.set_base_dir(tmp_base.clone()); + + restate_types::config::set_current_config(config.clone()); + + let recorder = PrometheusBuilder::new().install_recorder().unwrap(); + let (tc, bifrost) = spawn_environment(config.clone(), 1); + let task_center = tc.clone(); + let args = cli_args.clone(); + tc.block_on("benchpress", None, async move { + let tracing_guard = init_tracing_and_logging(&config.common, "Bifrost benchpress") + .expect("failed to configure logging and tracing!"); + + match args.command { + Command::WriteToRead(ref opts) => { + read_to_write::run(&args, opts, task_center.clone(), bifrost).await?; + } + Command::AppendLatency(ref opts) => { + append_latency::run(&args, opts, task_center.clone(), bifrost).await?; + } + } + task_center.shutdown_node("completed", 0).await; + // print prometheus if asked. + if !args.no_prometheus_stats { + print_prometheus_stats(&recorder); + } + + // print rocksdb stats if asked. + if !args.no_rocksdb_stats { + print_rocksdb_stats("local-loglet"); + } + + // We shutdown the database after stats to avoid enclosing the shutdown process in our + // metrics. + RocksDbManager::get().shutdown().await; + // Make sure that all pending spans are flushed + let shutdown_tracing_with_timeout = + tokio::time::timeout(Duration::from_secs(10), tracing_guard.async_shutdown()); + let shutdown_result = shutdown_tracing_with_timeout.await; + + if shutdown_result.is_err() { + trace!("Failed to fully flush pending spans, terminating now."); + } + anyhow::Ok(()) + })?; + + if cli_args.retain_test_dir { + println!("Keeping the base_dir in {}", tmp_base.display()); + } else { + println!("Removing test dir at {}", tmp_base.display()); + } + + Ok(()) +} + +fn spawn_environment(config: Configuration, num_logs: u64) -> (TaskCenter, Bifrost) { + let tc = TaskCenterBuilder::default() + .options(config.common.clone()) + .build() + .expect("task_center builds"); + + restate_types::config::set_current_config(config.clone()); + let task_center = tc.clone(); + let bifrost = tc.block_on("spawn", None, async move { + let network_sender = MockNetworkSender::default(); + let metadata_store_client = MetadataStoreClient::new_in_memory(); + let metadata_manager = + MetadataManager::build(network_sender.clone(), metadata_store_client.clone()); + + let metadata = metadata_manager.metadata(); + let metadata_writer = metadata_manager.writer(); + task_center.try_set_global_metadata(metadata.clone()); + + RocksDbManager::init(Constant::new(config.common)); + + let logs = restate_types::logs::metadata::create_static_metadata( + config.bifrost.default_provider, + num_logs, + ); + + metadata_store_client + .put(BIFROST_CONFIG_KEY.clone(), logs.clone(), Precondition::None) + .await + .expect("to store bifrost config in metadata store"); + metadata_writer.submit(logs); + spawn_metadata_manager(&task_center, metadata_manager).expect("metadata manager starts"); + + let bifrost_svc = BifrostService::new(metadata); + let bifrost = bifrost_svc.handle(); + + // start bifrost service in the background + bifrost_svc.start().await.expect("bifrost starts"); + bifrost + }); + (tc, bifrost) +} diff --git a/tools/bifrost-benchpress/src/read_to_write.rs b/tools/bifrost-benchpress/src/read_to_write.rs new file mode 100644 index 0000000000..abe01ecbaa --- /dev/null +++ b/tools/bifrost-benchpress/src/read_to_write.rs @@ -0,0 +1,117 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::time::{Duration, Instant}; + +use bytes::{Buf, BufMut, BytesMut}; +use hdrhistogram::Histogram; +use tracing::info; + +use restate_bifrost::{Bifrost, Error as BifrostError}; +use restate_bifrost::{LogRecord, Record}; +use restate_core::{cancellation_watcher, TaskCenter, TaskKind}; +use restate_types::logs::{LogId, Lsn, Payload, SequenceNumber}; + +use crate::util::print_latencies; +use crate::Arguments; + +#[derive(Debug, Clone, clap::Parser)] +pub struct WriteToReadOpts {} + +pub async fn run( + _common_args: &Arguments, + _args: &WriteToReadOpts, + tc: TaskCenter, + bifrost: Bifrost, +) -> anyhow::Result<()> { + let log_id = LogId::from(0); + let clock = quanta::Clock::new(); + // Create two tasks, one that writes to the log continously and another one that reads from the + // log and measures the latency. Collect latencies in a histogram and print the histogram + // before the test ends. + let reader_task = tc.spawn(TaskKind::TestRunner, "test-log-reader", None, { + let bifrost = bifrost.clone(); + let clock = clock.clone(); + async move { + let mut read_stream = bifrost.create_reader(log_id, Lsn::OLDEST); + let mut counter = 0; + let mut cancel = std::pin::pin!(cancellation_watcher()); + let mut lag_latencies = Histogram::::new(3)?; + let mut on_record = |record: Result| { + if let Ok(record) = record { + if let Record::Data(data) = record.record { + let latency_raw = data.into_body().get_u64(); + let latency = clock.scaled(latency_raw).elapsed(); + lag_latencies.record(latency.as_nanos() as u64)?; + } else { + panic!("Unexpected non-data record"); + } + } else { + panic!("Unexpected error"); + } + anyhow::Ok(()) + }; + loop { + counter += 1; + tokio::select! { + _ = &mut cancel => { + // test is over. print histogram + info!("Reader stopping"); + break; + } + record = read_stream.read_next() => { + on_record(record)?; + } + } + if counter % 1000 == 0 { + info!("Read {} records", counter); + } + } + + println!("Total records read: {}", counter); + print_latencies("read lag", lag_latencies); + Ok(()) + } + })?; + + let writer_task = tc.spawn(TaskKind::TestRunner, "test-log-appender", None, { + let mut bifrost = bifrost.clone(); + let clock = clock.clone(); + async move { + let mut append_latencies = Histogram::::new(3)?; + let mut counter = 0; + loop { + counter += 1; + let start = Instant::now(); + let mut bytes = BytesMut::default(); + bytes.put_u64(clock.raw()); + let payload = Payload::new(bytes); + if bifrost.append(log_id, payload).await.is_err() { + println!("Total records written: {}", counter); + print_latencies("append latency", append_latencies); + break; + }; + + append_latencies.record(start.elapsed().as_nanos() as u64)?; + if counter % 1000 == 0 { + info!("Appended {} records", counter); + } + } + anyhow::Ok(()) + } + })?; + + tokio::time::sleep(Duration::from_secs(30)).await; + + tc.cancel_task(reader_task).unwrap().await?; + info!("Reader stopped"); + let _ = tc.cancel_task(writer_task); + Ok(()) +} diff --git a/tools/bifrost-benchpress/src/util.rs b/tools/bifrost-benchpress/src/util.rs new file mode 100644 index 0000000000..7ac0a4453b --- /dev/null +++ b/tools/bifrost-benchpress/src/util.rs @@ -0,0 +1,111 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::io::Write; +use std::time::Duration; + +use hdrhistogram::Histogram; +use metrics_exporter_prometheus::PrometheusHandle; +use restate_rocksdb::{DbName, RocksDbManager}; + +pub fn print_latencies(title: &str, histogram: Histogram) { + let mut stdout = std::io::stdout().lock(); + let _ = writeln!(&mut stdout, "Histogram of {}", title); + let _ = writeln!( + &mut stdout, + "P50: {:?}", + Duration::from_nanos(histogram.value_at_percentile(50.0)) + ); + let _ = writeln!( + &mut stdout, + "P90: {:?}", + Duration::from_nanos(histogram.value_at_percentile(90.0)) + ); + let _ = writeln!( + &mut stdout, + "P99: {:?}", + Duration::from_nanos(histogram.value_at_percentile(99.0)) + ); + let _ = writeln!( + &mut stdout, + "P999: {:?}", + Duration::from_nanos(histogram.value_at_percentile(99.9)) + ); + let _ = writeln!( + &mut stdout, + "P100: {:?}", + Duration::from_nanos(histogram.value_at_percentile(100.0)) + ); + let _ = writeln!(&mut stdout, "# of samples: {}", histogram.len()); + let _ = writeln!(&mut stdout); +} + +pub fn print_rocksdb_stats(db_name: &str) { + let db_manager = RocksDbManager::get(); + let db = db_manager.get_db(DbName::new(db_name)).unwrap(); + let stats = db.get_statistics_str(); + let total_wb_usage = db_manager.get_total_write_buffer_usage(); + let wb_capacity = db_manager.get_total_write_buffer_capacity(); + let memory = db_manager + .get_memory_usage_stats(&[DbName::new(db_name)]) + .unwrap(); + let mut stdout = std::io::stdout().lock(); + + let _ = writeln!(&mut stdout); + let _ = writeln!(&mut stdout, "=========================="); + let _ = writeln!(&mut stdout, "Rocksdb Stats of {}", db_name); + let _ = writeln!(&mut stdout, "{}", stats.unwrap()); + + let _ = writeln!( + &mut stdout, + "RocksDB approximate memory usage for all mem-tables: {}", + memory.approximate_mem_table_total(), + ); + let _ = writeln!( + &mut stdout, + "RocksDB approximate memory usage of un-flushed mem-tables: {}", + memory.approximate_mem_table_unflushed(), + ); + + let _ = writeln!( + &mut stdout, + "RocksDB approximate memory usage of all the table readers: {}", + memory.approximate_mem_table_readers_total(), + ); + + let _ = writeln!( + &mut stdout, + "RocksDB approximate memory usage by cache: {}", + memory.approximate_cache_total(), + ); + + let _ = writeln!( + &mut stdout, + "RocksDB total write buffers usage (from write buffers manager): {}", + total_wb_usage + ); + + let _ = writeln!( + &mut stdout, + "RocksDB total write buffers capacity (from write buffers manager): {}", + wb_capacity + ); +} + +pub fn print_prometheus_stats(handle: &PrometheusHandle) { + let mut stdout = std::io::stdout().lock(); + let _ = writeln!(&mut stdout); + let _ = writeln!(&mut stdout, "=========================="); + let _ = writeln!(&mut stdout, "Prometheus Stats"); + let metrics = handle.render(); + + // print the metrics to the terminal. + let _ = writeln!(&mut stdout, "{}", metrics); +}