Skip to content

Commit

Permalink
Rename storage-rocksdb to partition-store
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed Apr 29, 2024
1 parent 08fb8f4 commit 8bcdd4b
Show file tree
Hide file tree
Showing 51 changed files with 227 additions and 90 deletions.
91 changes: 45 additions & 46 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ restate-network = { path = "crates/network" }
restate-node = { path = "crates/node" }
restate-node-protocol = { path = "crates/node-protocol" }
restate-node-services = { path = "crates/node-services" }
restate-partition-store = { path = "crates/partition-store" }
restate-queue = { path = "crates/queue" }
restate-rocksdb = { path = "crates/rocksdb" }
restate-schema = { path = "crates/schema" }
Expand All @@ -60,7 +61,6 @@ restate-service-protocol = { path = "crates/service-protocol" }
restate-storage-api = { path = "crates/storage-api" }
restate-storage-query-datafusion = { path = "crates/storage-query-datafusion" }
restate-storage-query-postgres = { path = "crates/storage-query-postgres" }
restate-storage-rocksdb = { path = "crates/storage-rocksdb" }
restate-test-util = { path = "crates/test-util" }
restate-timer = { path = "crates/timer" }
restate-timer-queue = { path = "crates/timer-queue" }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "restate-storage-rocksdb"
name = "restate-partition-store"
version.workspace = true
authors.workspace = true
edition.workspace = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ use std::ops::RangeInclusive;

use criterion::{criterion_group, criterion_main, Criterion};
use restate_core::TaskCenterBuilder;
use restate_partition_store::{OpenMode, PartitionStore, PartitionStoreManager};
use restate_rocksdb::RocksDbManager;
use restate_storage_api::deduplication_table::{
DedupSequenceNumber, DeduplicationTable, ProducerId,
};
use restate_storage_api::Transaction;
use restate_storage_rocksdb::{OpenMode, PartitionStore, PartitionStoreManager};
use restate_types::arc_util::Constant;
use restate_types::config::{CommonOptions, WorkerOptions};
use restate_types::identifiers::PartitionKey;
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ pub trait TableKey: Sized + std::fmt::Debug + Send + 'static {
/// This macro expands to:
/// ```ignore
/// use bytes::{Buf, BufMut, Bytes};
/// use restate_storage_rocksdb::TableKind;
/// use restate_partition_store::TableKind;
/// #[derive(Debug, Eq, PartialEq)]
/// pub struct FooBarKey {
/// pub foo: Option<u32>,
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
138 changes: 138 additions & 0 deletions crates/partition-store/src/scoped_db.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::sync::Arc;

use bytes::BytesMut;
use enum_map::{enum_map, EnumMap};
use restate_rocksdb::{CfName, RocksDb};
use restate_types::identifiers::PartitionId;
use rocksdb::{BoundColumnFamily, DBCompressionType};

use crate::TableKind;

pub const PARTITION_DATA_CF_PREFIX: &str = "partition-data-";
pub const PARTITION_KV_CF_PREFIX: &str = "partition-kv-";

pub struct PartitionScopedDb<'a> {
partition_id: PartitionId,
rocksdb: &'a Arc<RocksDb>,
table_to_cf: EnumMap<TableKind, Arc<BoundColumnFamily<'a>>>,
key_buffer: BytesMut,
value_buffer: BytesMut,
}

impl<'a> Clone for PartitionScopedDb<'a> {
fn clone(&self) -> Self {
Self {
partition_id: self.partition_id.clone(),
rocksdb: self.rocksdb,
table_to_cf: self.table_to_cf.clone(),
key_buffer: BytesMut::default(),
value_buffer: BytesMut::default(),
}
}
}

impl<'a> PartitionScopedDb<'a> {
pub fn new(partition_id: PartitionId, rocksdb: &'a Arc<RocksDb>) -> Option<Self> {
let data_cf = rocksdb.cf_handle(&data_cf_for_partition(&partition_id))?;
let kv_cf = rocksdb.cf_handle(&kv_cf_for_partition(&partition_id))?;

let table_to_cf = enum_map! {
// optimized for point lookups
TableKind::State => kv_cf.clone(),
TableKind::ServiceStatus => kv_cf.clone(),
TableKind::InvocationStatus => kv_cf.clone(),
TableKind::Idempotency => kv_cf.clone(),
TableKind::Deduplication => kv_cf.clone(),
TableKind::PartitionStateMachine => kv_cf.clone(),
// optimized for prefix scan lookups
TableKind::Journal => data_cf.clone(),
TableKind::Inbox => data_cf.clone(),
TableKind::Outbox => data_cf.clone(),
TableKind::Timers => data_cf.clone(),
};

Some(Self {
partition_id,
rocksdb,
table_to_cf,
key_buffer: BytesMut::default(),
value_buffer: BytesMut::default(),
})
}

pub(crate) fn table_handle(&self, table_kind: TableKind) -> &Arc<BoundColumnFamily> {
&self.table_to_cf[table_kind]
}
}

pub(crate) fn data_cf_for_partition(partition_id: &PartitionId) -> CfName {
CfName::from(format!("{}{}", PARTITION_DATA_CF_PREFIX, partition_id))
}

pub(crate) fn kv_cf_for_partition(partition_id: &PartitionId) -> CfName {
CfName::from(format!("{}{}", PARTITION_KV_CF_PREFIX, partition_id))
}

pub(crate) fn partition_ids_to_cfs(partition_ids: &[PartitionId]) -> Vec<CfName> {
partition_ids
.iter()
.map(data_cf_for_partition)
.chain(partition_ids.iter().map(kv_cf_for_partition))
.collect()
}

pub(crate) fn data_cf_options(mut cf_options: rocksdb::Options) -> rocksdb::Options {
// Most of the changes are highly temporal, we try to delay flushing
// As much as we can to increase the chances to observe a deletion.
//
cf_options.set_max_write_buffer_number(3);
cf_options.set_min_write_buffer_number_to_merge(2);
//
// Set compactions per level
//
cf_options.set_num_levels(7);
cf_options.set_compression_per_level(&[
DBCompressionType::None,
DBCompressionType::Snappy,
DBCompressionType::Snappy,
DBCompressionType::Snappy,
DBCompressionType::Snappy,
DBCompressionType::Snappy,
DBCompressionType::Zstd,
]);

cf_options
}

pub(crate) fn kv_cf_options(mut cf_options: rocksdb::Options) -> rocksdb::Options {
// Most of the changes are highly temporal, we try to delay flushing
// As much as we can to increase the chances to observe a deletion.
//
cf_options.set_max_write_buffer_number(3);
cf_options.set_min_write_buffer_number_to_merge(2);
//
// Set compactions per level
//
cf_options.set_num_levels(7);
cf_options.set_compression_per_level(&[
DBCompressionType::None,
DBCompressionType::Snappy,
DBCompressionType::Snappy,
DBCompressionType::Snappy,
DBCompressionType::Snappy,
DBCompressionType::Snappy,
DBCompressionType::Zstd,
]);

cf_options
}
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@

use crate::{assert_stream_eq, mock_state_mutation};
use once_cell::sync::Lazy;
use restate_partition_store::PartitionStore;
use restate_storage_api::inbox_table::{InboxEntry, InboxTable, SequenceNumberInboxEntry};
use restate_storage_api::Transaction;
use restate_storage_rocksdb::PartitionStore;
use restate_types::identifiers::{InvocationId, ServiceId};

static INBOX_ENTRIES: Lazy<Vec<SequenceNumberInboxEntry>> = Lazy::new(|| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
use bytes::Bytes;
use futures::Stream;
use restate_core::TaskCenterBuilder;
use restate_partition_store::{OpenMode, PartitionStore, PartitionStoreManager};
use restate_rocksdb::RocksDbManager;
use restate_storage_api::StorageError;
use restate_storage_rocksdb::{OpenMode, PartitionStore, PartitionStoreManager};
use restate_types::arc_util::Constant;
use restate_types::config::{CommonOptions, WorkerOptions};
use restate_types::identifiers::{InvocationId, PartitionKey, ServiceId};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ use super::assert_stream_eq;

use bytestring::ByteString;
use once_cell::sync::Lazy;
use restate_partition_store::PartitionStore;
use restate_storage_api::invocation_status_table::{
InFlightInvocationMetadata, InvocationStatus, InvocationStatusTable, JournalMetadata,
StatusTimestamps,
};
use restate_storage_rocksdb::PartitionStore;
use restate_types::identifiers::InvocationId;
use restate_types::invocation::{
HandlerType, InvocationTarget, ServiceInvocationSpanContext, Source,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use bytes::Bytes;
use bytestring::ByteString;
use futures_util::StreamExt;
use once_cell::sync::Lazy;
use restate_partition_store::PartitionStore;
use restate_storage_api::journal_table::{JournalEntry, JournalTable};
use restate_storage_api::Transaction;
use restate_storage_rocksdb::PartitionStore;
use restate_types::identifiers::{InvocationId, InvocationUuid};
use restate_types::invocation::{InvocationTarget, ServiceInvocationSpanContext};
use restate_types::journal::enriched::{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
// by the Apache License, Version 2.0.

use crate::mock_random_service_invocation;
use restate_partition_store::PartitionStore;
use restate_storage_api::outbox_table::{OutboxMessage, OutboxTable};
use restate_storage_api::Transaction;
use restate_storage_rocksdb::PartitionStore;

fn mock_outbox_message() -> OutboxMessage {
OutboxMessage::ServiceInvocation(mock_random_service_invocation())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@

use crate::{assert_stream_eq, storage_test_environment};
use bytes::Bytes;
use restate_partition_store::PartitionStore;
use restate_storage_api::state_table::{ReadOnlyStateTable, StateTable};
use restate_storage_api::Transaction;
use restate_storage_rocksdb::PartitionStore;
use restate_types::identifiers::ServiceId;

async fn populate_data<T: StateTable>(table: &mut T) {
Expand Down
Loading

0 comments on commit 8bcdd4b

Please sign in to comment.