Skip to content

Commit

Permalink
PP storage integration with rocksdb manager
Browse files Browse the repository at this point in the history
**important change:** This PR introduces a departure from supporting concurrent unit test runs. At this PR, the default number of threads to run tests with `cargo test` is set to 1. However, in a follow-up PR, I'll enable the use of `nextest` in CI.

For our development setup, we should always use `cargo nextest run` for fast parallel (but not in-process concurrent) runs.
  • Loading branch information
AhmedSoliman committed Apr 12, 2024
1 parent 2093562 commit d0401d5
Show file tree
Hide file tree
Showing 17 changed files with 303 additions and 231 deletions.
3 changes: 3 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
[env]
RUST_TEST_THREADS = "1"

[alias]
xtask = "run --package xtask --"

Expand Down
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion crates/bifrost/src/loglets/local_loglet/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ use super::keys::{MetadataKey, MetadataKind};
use super::log_state::{log_state_full_merge, log_state_partial_merge, LogState};
use super::log_store_writer::LogStoreWriter;

pub(crate) const DB_NAME: &str = "local_loglet";
// matches the default directory name
pub(crate) const DB_NAME: &str = "local-loglet";

pub(crate) const DATA_CF: &str = "logstore_data";
pub(crate) const METADATA_CF: &str = "logstore_metadata";

Expand Down
23 changes: 19 additions & 4 deletions crates/rocksdb/src/db_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ static DB_MANAGER: OnceLock<RocksDbManager> = OnceLock::new();
enum WatchdogCommand {
Register(ConfigSubscription),
#[cfg(any(test, feature = "test-util"))]
ResetAll,
ResetAll(tokio::sync::oneshot::Sender<()>),
}

/// Tracks rocksdb databases created by various components, memory budgeting, monitoring, and
Expand Down Expand Up @@ -60,10 +60,15 @@ impl RocksDbManager {
DB_MANAGER.get().expect("DBManager not initialized")
}

/// Create a new instance of the database manager
/// Create a new instance of the database manager. This should not be executed concurrently,
/// only run it once on program startup.
///
/// Must run in task_center scope.
pub fn init(mut base_opts: impl Updateable<CommonOptions> + Send + 'static) -> &'static Self {
// best-effort, it doesn't make concurrent access safe, but it's better than nothing.
if let Some(manager) = DB_MANAGER.get() {
return manager;
}
let opts = base_opts.load();
let cache = Cache::new_lru_cache(opts.rocksdb_total_memory_limit as usize);
let write_buffer_manager = WriteBufferManager::new_write_buffer_manager_with_cache(
Expand Down Expand Up @@ -154,14 +159,22 @@ impl RocksDbManager {
e
);
}
info!(
db = %name,
owner = %owner,
"Opened rocksdb database"
);
Ok(db)
}

#[cfg(any(test, feature = "test-util"))]
pub async fn reset(&self) -> anyhow::Result<()> {
let (tx, rx) = tokio::sync::oneshot::channel();
self.watchdog_tx
.send(WatchdogCommand::ResetAll)
.send(WatchdogCommand::ResetAll(tx))
.map_err(|_| RocksError::Shutdown(ShutdownError))?;
// safe to unwrap since we use this only in tests
rx.await.unwrap();
Ok(())
}

Expand Down Expand Up @@ -313,7 +326,7 @@ impl DbWatchdog {
async fn handle_command(&mut self, cmd: WatchdogCommand) {
match cmd {
#[cfg(any(test, feature = "test-util"))]
WatchdogCommand::ResetAll => {
WatchdogCommand::ResetAll(response) => {
self.manager
.shutting_down
.store(true, std::sync::atomic::Ordering::Release);
Expand All @@ -323,6 +336,8 @@ impl DbWatchdog {
self.manager
.shutting_down
.store(false, std::sync::atomic::Ordering::Release);
// safe to unwrap since we use this only in tests
response.send(()).unwrap();
}
WatchdogCommand::Register(sub) => self.subscriptions.push(sub),
}
Expand Down
19 changes: 19 additions & 0 deletions crates/rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,25 @@ impl DbSpec<rocksdb::DB> {
}
}

impl DbSpec<rocksdb::OptimisticTransactionDB> {
pub fn new_optimistic_db(
name: DbName,
owner: Owner,
path: PathBuf,
db_options: rocksdb::Options,
column_families: Vec<(CfName, rocksdb::Options)>,
) -> DbSpec<rocksdb::OptimisticTransactionDB> {
Self {
name,
owner,
path,
db_options,
column_families,
_phantom: std::marker::PhantomData,
}
}
}

#[derive(derive_more::Display, Clone)]
#[display(fmt = "{}::{}", owner, name)]
pub struct RocksDb {
Expand Down
5 changes: 4 additions & 1 deletion crates/storage-query-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ tokio = { workspace = true }
tracing = { workspace = true }

[dev-dependencies]
restate-types = { workspace = true, features = ["test-util"] }
restate-core = { workspace = true, features = ["test-util"] }
restate-invoker-api = { workspace = true, features = ["mocks"] }
restate-rocksdb = { workspace = true, features = ["test-util"] }
restate-schema-api = { workspace = true, features = ["mocks"] }
restate-types = { workspace = true, features = ["test-util"] }

drain = { workspace = true }
googletest = { workspace = true }
9 changes: 8 additions & 1 deletion crates/storage-query-datafusion/src/idempotency/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,20 @@ use datafusion::arrow::record_batch::RecordBatch;
use futures::StreamExt;
use googletest::all;
use googletest::prelude::{assert_that, eq};
use restate_core::TaskCenterBuilder;
use restate_storage_api::idempotency_table::{IdempotencyMetadata, IdempotencyTable};
use restate_storage_api::Transaction;
use restate_types::identifiers::{IdempotencyId, InvocationId};

#[tokio::test]
async fn get_idempotency_key() {
let (mut engine, shutdown) = MockQueryEngine::default();
let tc = TaskCenterBuilder::default()
.default_runtime_handle(tokio::runtime::Handle::current())
.build()
.expect("task_center builds");
let (mut engine, shutdown) = tc
.run_in_scope("mock-query-engine", None, MockQueryEngine::create())
.await;

let mut tx = engine.rocksdb_mut().transaction();
let invocation_id_1 = InvocationId::mock_random();
Expand Down
28 changes: 15 additions & 13 deletions crates/storage-query-datafusion/src/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@ use datafusion::arrow::array::ArrayRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::execution::SendableRecordBatchStream;
use googletest::matcher::{Matcher, MatcherResult};
use restate_core::task_center;
use restate_invoker_api::status_handle::mocks::MockStatusHandle;
use restate_rocksdb::RocksDbManager;
use restate_schema_api::component::mocks::MockComponentMetadataResolver;
use restate_schema_api::component::{ComponentMetadata, ComponentMetadataResolver, ComponentType};
use restate_schema_api::deployment::mocks::MockDeploymentMetadataRegistry;
use restate_schema_api::deployment::{Deployment, DeploymentResolver};
use restate_storage_rocksdb::RocksDBStorage;
use restate_types::arc_util::Constant;
use restate_types::config::{QueryEngineOptions, WorkerOptions};
use restate_types::config::{CommonOptions, QueryEngineOptions, WorkerOptions};
use restate_types::identifiers::{ComponentRevision, DeploymentId};
use std::fmt::Debug;
use std::future::Future;
Expand Down Expand Up @@ -81,18 +83,18 @@ impl DeploymentResolver for MockSchemas {
pub(crate) struct MockQueryEngine(RocksDBStorage, QueryContext);

impl MockQueryEngine {
pub fn default() -> (Self, impl Future<Output = ()>) {
Self::new(MockSchemas::default(), MockStatusHandle::default())
}

pub fn new(
mock_schemas: MockSchemas,
mock_status: MockStatusHandle,
) -> (Self, impl Future<Output = ()>) {
pub async fn create() -> (Self, impl Future<Output = ()>) {
// Prepare Rocksdb
task_center().run_in_scope_sync("db-manager-init", None, || {
RocksDbManager::init(Constant::new(CommonOptions::default()))
});
let worker_options = WorkerOptions::default();
let (rocksdb, writer) = RocksDBStorage::new(Constant::new(worker_options))
.expect("RocksDB storage creation should succeed");
let (rocksdb, writer) = RocksDBStorage::open(
worker_options.data_dir(),
Constant::new(worker_options.rocksdb),
)
.await
.expect("RocksDB storage creation should succeed");
let (signal, watch) = drain::channel();
let writer_join_handle = writer.run(watch);

Expand All @@ -101,8 +103,8 @@ impl MockQueryEngine {
QueryContext::from_options(
&QueryEngineOptions::default(),
rocksdb,
mock_status,
mock_schemas,
MockStatusHandle::default(),
MockSchemas::default(),
)
.unwrap(),
);
Expand Down
4 changes: 4 additions & 0 deletions crates/storage-rocksdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ default = []
options_schema = ["dep:schemars"]

[dependencies]
restate-core = { workspace = true }
restate-errors = { workspace = true }
restate-rocksdb = { workspace = true }
restate-storage-api = { workspace = true, features = ["serde"] }
restate-storage-proto = { workspace = true, features = ["conversion"] }
restate-types = { workspace = true }
Expand All @@ -38,6 +40,8 @@ once_cell = "1.18.0"
log = "0.4.20"

[dev-dependencies]
restate-core = { workspace = true, features = ["test-util"] }
restate-rocksdb = { workspace = true, features = ["test-util"] }
restate-test-util = { workspace = true }
restate-types = { workspace = true, features = ["test-util"] }

Expand Down
26 changes: 22 additions & 4 deletions crates/storage-rocksdb/benches/basic_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,26 @@
// by the Apache License, Version 2.0.

use criterion::{criterion_group, criterion_main, Criterion};
use restate_core::TaskCenterBuilder;
use restate_rocksdb::RocksDbManager;
use restate_storage_api::deduplication_table::DeduplicationTable;
use restate_storage_api::Transaction;
use restate_storage_rocksdb::RocksDBStorage;
use restate_types::arc_util::Constant;
use restate_types::config::WorkerOptions;
use restate_types::config::{CommonOptions, WorkerOptions};
use restate_types::dedup::{DedupSequenceNumber, ProducerId};
use tokio::runtime::Builder;

async fn writing_to_rocksdb(worker_options: WorkerOptions) {
//
// setup
//
let (mut rocksdb, writer) = RocksDBStorage::new(Constant::new(worker_options))
.expect("RocksDB storage creation should succeed");
let (mut rocksdb, writer) = RocksDBStorage::open(
worker_options.data_dir(),
Constant::new(worker_options.rocksdb),
)
.await
.expect("RocksDB storage creation should succeed");

let (signal, watch) = drain::channel();
let writer_join_handler = writer.run(watch);
Expand All @@ -44,16 +50,28 @@ async fn writing_to_rocksdb(worker_options: WorkerOptions) {
}

fn basic_writing_reading_benchmark(c: &mut Criterion) {
let rt = Builder::new_multi_thread().enable_all().build().unwrap();

let tc = TaskCenterBuilder::default()
.default_runtime_handle(rt.handle().clone())
.build()
.expect("task_center builds");

tc.run_in_scope_sync("db-manager-init", None, || {
RocksDbManager::init(Constant::new(CommonOptions::default()))
});
let mut group = c.benchmark_group("RocksDB");
group.sample_size(10).bench_function("writing", |bencher| {
// This will generate a temp dir since we have test-util feature enabled
let worker_options = WorkerOptions::default();
bencher
.to_async(Builder::new_multi_thread().enable_all().build().unwrap())
.to_async(&rt)
.iter(|| writing_to_rocksdb(worker_options.clone()));
});

rt.block_on(RocksDbManager::get().shutdown());
group.finish();
rt.block_on(tc.shutdown_node("completed", 0));
}

criterion_group!(benches, basic_writing_reading_benchmark);
Expand Down
Loading

0 comments on commit d0401d5

Please sign in to comment.